fedimint_load_test_tool/
main.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_precision_loss)]
4#![allow(clippy::missing_errors_doc)]
5#![allow(clippy::ref_option)]
6#![allow(clippy::too_many_lines)]
7#![allow(clippy::large_futures)]
8
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::time::Duration;
13use std::vec;
14
15use anyhow::{Context, anyhow, bail};
16use clap::{Args, Parser, Subcommand, ValueEnum};
17use common::{
18    gateway_pay_invoice, get_note_summary, ldk_create_invoice, ldk_pay_invoice,
19    ldk_wait_invoice_payment, parse_gateway_id, reissue_notes,
20};
21use devimint::cmd;
22use devimint::util::GatewayLndCli;
23use devimint::version_constants::VERSION_0_10_0_ALPHA;
24use fedimint_client::ClientHandleArc;
25use fedimint_connectors::ConnectorRegistry;
26use fedimint_core::Amount;
27use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
28use fedimint_core::invite_code::InviteCode;
29use fedimint_core::module::ApiRequestErased;
30use fedimint_core::runtime::spawn;
31use fedimint_core::util::{BoxFuture, SafeUrl};
32use fedimint_ln_client::{LightningClientModule, LnReceiveState};
33use fedimint_ln_common::LightningGateway;
34use fedimint_mint_client::OOBNotes;
35use futures::StreamExt;
36use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
37use serde::{Deserialize, Serialize};
38use tokio::fs::OpenOptions;
39use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
40use tokio::sync::mpsc;
41use tracing::{debug, info, warn};
42
43use crate::common::{
44    build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli,
45};
46pub mod common;
47
48#[derive(Parser, Clone)]
49#[command(version)]
50struct Opts {
51    #[arg(
52        long,
53        default_value = "10",
54        help = "Number of users. Each user will work in parallel"
55    )]
56    users: u16,
57
58    #[arg(long, help = "Output with the metrics results in JSON format")]
59    metrics_json_output: Option<PathBuf>,
60
61    #[arg(
62        long,
63        help = "If given, will be used to store and retrieve past metrics for comparison purposes"
64    )]
65    archive_dir: Option<PathBuf>,
66
67    #[clap(subcommand)]
68    command: Command,
69}
70
71#[derive(Debug, Clone, Copy, ValueEnum)]
72enum LnInvoiceGeneration {
73    LdkLightningCli,
74}
75
76#[derive(Subcommand, Clone)]
77enum Command {
78    #[command(about = "Keep many websocket connections to a federation for a duration of time")]
79    TestConnect {
80        #[arg(long, help = "Federation invite code")]
81        invite_code: String,
82        #[arg(
83            long,
84            default_value = "60",
85            help = "How much time to keep the connections open, in seconds"
86        )]
87        duration_secs: u64,
88        #[arg(
89            long,
90            default_value = "120",
91            help = "Timeout for connection attempt and for each request, in secnods"
92        )]
93        timeout_secs: u64,
94        #[arg(
95            long,
96            help = "If given, will limit the number of endpoints (guardians) to connect to"
97        )]
98        limit_endpoints: Option<usize>,
99    },
100    #[command(about = "Try to download the client config many times.")]
101    TestDownload {
102        #[arg(long, help = "Federation invite code")]
103        invite_code: String,
104    },
105    #[command(
106        about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway"
107    )]
108    LoadTest(LoadTestArgs),
109    /// Run a load test where many users in parallel will receive then send a
110    /// payment through lightning.
111    /// It's 'circular' because the funds always come back to the same user then
112    /// we can keep making the payments in a loop
113    #[command()]
114    LnCircularLoadTest(LnCircularLoadTestArgs),
115}
116
117#[derive(Args, Clone)]
118struct LoadTestArgs {
119    #[arg(
120        long,
121        help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
122    )]
123    invite_code: Option<InviteCode>,
124
125    #[arg(
126        long,
127        help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
128    )]
129    initial_notes: Option<OOBNotes>,
130
131    #[arg(
132        long,
133        help = "Gateway Id. If none, retrieve one according to --generate-invoice-with"
134    )]
135    gateway_id: Option<String>,
136
137    #[arg(
138        long,
139        help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)"
140    )]
141    generate_invoice_with: Option<LnInvoiceGeneration>,
142
143    #[arg(
144        long,
145        default_value = "1",
146        help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided"
147    )]
148    invoices_per_user: u16,
149
150    #[arg(
151        long,
152        default_value = "0",
153        help = "How many seconds to sleep between LN payments"
154    )]
155    ln_payment_sleep_secs: u64,
156
157    #[arg(
158        long,
159        help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid"
160    )]
161    invoices_file: Option<PathBuf>,
162
163    #[arg(
164        long,
165        help = "How many notes to distribute to each user",
166        default_value = "2"
167    )]
168    notes_per_user: u16,
169
170    #[arg(
171        long,
172        help = "Note denomination to use for the test",
173        default_value = "1024"
174    )]
175    note_denomination: Amount,
176
177    #[arg(
178        long,
179        help = "Invoice amount when generating one",
180        default_value = "1000"
181    )]
182    invoice_amount: Amount,
183}
184
185#[derive(Args, Clone)]
186struct LnCircularLoadTestArgs {
187    #[arg(
188        long,
189        help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
190    )]
191    invite_code: Option<InviteCode>,
192
193    #[arg(
194        long,
195        help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
196    )]
197    initial_notes: Option<OOBNotes>,
198
199    #[arg(
200        long,
201        default_value = "60",
202        help = "For how many seconds to run the test"
203    )]
204    test_duration_secs: u64,
205
206    #[arg(
207        long,
208        default_value = "0",
209        help = "How many seconds to sleep between LN payments"
210    )]
211    ln_payment_sleep_secs: u64,
212
213    #[arg(
214        long,
215        help = "How many notes to distribute to each user",
216        default_value = "1"
217    )]
218    notes_per_user: u16,
219
220    #[arg(
221        long,
222        help = "Note denomination to use for the test",
223        default_value = "1024"
224    )]
225    note_denomination: Amount,
226
227    #[arg(
228        long,
229        help = "Invoice amount when generating one",
230        default_value = "1000"
231    )]
232    invoice_amount: Amount,
233
234    #[arg(long)]
235    strategy: LnCircularStrategy,
236}
237
238#[derive(Debug, Clone, Copy, ValueEnum)]
239enum LnCircularStrategy {
240    /// The user will pay its own invoice
241    SelfPayment,
242    /// One gateway will pay/receive to/from the other, then they will swap
243    /// places
244    TwoGateways,
245    /// Two clients will pay to each other using the same gateway
246    PartnerPingPong,
247}
248
249#[derive(Debug, Clone)]
250pub struct MetricEvent {
251    name: String,
252    duration: Duration,
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize)]
256struct EventMetricSummary {
257    name: String,
258    users: u64,
259    n: u64,
260    avg_ms: u128,
261    median_ms: u128,
262    max_ms: u128,
263    min_ms: u128,
264    timestamp_seconds: u64,
265}
266
267#[derive(Debug, Serialize, Deserialize)]
268struct EventMetricComparison {
269    avg_ms_gain: f64,
270    median_ms_gain: f64,
271    max_ms_gain: f64,
272    min_ms_gain: f64,
273    current: EventMetricSummary,
274    previous: EventMetricSummary,
275}
276
277impl std::fmt::Display for EventMetricComparison {
278    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279        fn to_percent(gain: f64) -> String {
280            if gain >= 1.0 {
281                format!("+{:.2}%", (gain - 1.0) * 100.0)
282            } else {
283                format!("-{:.2}%", (1.0 - gain) * 100.0)
284            }
285        }
286        f.write_str(&format!(
287            "avg: {}, median: {}, max: {}, min: {}",
288            to_percent(self.avg_ms_gain),
289            to_percent(self.median_ms_gain),
290            to_percent(self.max_ms_gain),
291            to_percent(self.min_ms_gain),
292        ))
293    }
294}
295
296#[tokio::main]
297async fn main() -> anyhow::Result<()> {
298    fedimint_logging::TracingSetup::default().init()?;
299    let opts = Opts::parse();
300    let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
301    let summary_handle = spawn("handle metrics summary", {
302        let opts = opts.clone();
303        async { handle_metrics_summary(opts, event_receiver).await }
304    });
305    let futures = match opts.command.clone() {
306        Command::TestConnect {
307            invite_code,
308            duration_secs,
309            timeout_secs,
310            limit_endpoints,
311        } => {
312            let connectors = ConnectorRegistry::build_from_client_env()?.bind().await?;
313            let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
314            test_connect_raw_client(
315                &connectors,
316                invite_code,
317                opts.users,
318                Duration::from_secs(duration_secs),
319                Duration::from_secs(timeout_secs),
320                limit_endpoints,
321                event_sender.clone(),
322            )
323            .await?
324        }
325        Command::TestDownload { invite_code } => {
326            let connectors = ConnectorRegistry::build_from_client_env()?.bind().await?;
327            let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
328            test_download_config(&connectors, &invite_code, opts.users, &event_sender.clone())
329        }
330        Command::LoadTest(args) => {
331            let invite_code = invite_code_or_fallback(args.invite_code).await;
332
333            let gateway_id = if let Some(gateway_id) = args.gateway_id {
334                Some(gateway_id)
335            } else if let Some(generate_invoice_with) = args.generate_invoice_with {
336                Some(get_gateway_id(generate_invoice_with).await?)
337            } else {
338                None
339            };
340            let invoices = if let Some(invoices_file) = args.invoices_file {
341                let display_path = invoices_file.display();
342                let invoices_file = tokio::fs::File::open(&invoices_file)
343                    .await
344                    .with_context(|| format!("Failed to open {display_path}"))?;
345                let mut lines = tokio::io::BufReader::new(invoices_file).lines();
346                let mut invoices = vec![];
347                while let Some(line) = lines.next_line().await? {
348                    let invoice = Bolt11Invoice::from_str(&line)?;
349                    invoices.push(invoice);
350                }
351                invoices
352            } else {
353                vec![]
354            };
355            if args.generate_invoice_with.is_none() && invoices.is_empty() {
356                info!(
357                    "No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run"
358                );
359            }
360            run_load_test(
361                opts.archive_dir,
362                opts.users,
363                invite_code,
364                args.initial_notes,
365                args.generate_invoice_with,
366                args.invoices_per_user,
367                Duration::from_secs(args.ln_payment_sleep_secs),
368                invoices,
369                gateway_id,
370                args.notes_per_user,
371                args.note_denomination,
372                args.invoice_amount,
373                event_sender.clone(),
374            )
375            .await?
376        }
377        Command::LnCircularLoadTest(args) => {
378            let invite_code = invite_code_or_fallback(args.invite_code).await;
379            run_ln_circular_load_test(
380                opts.archive_dir,
381                opts.users,
382                invite_code,
383                args.initial_notes,
384                Duration::from_secs(args.test_duration_secs),
385                Duration::from_secs(args.ln_payment_sleep_secs),
386                args.notes_per_user,
387                args.note_denomination,
388                args.invoice_amount,
389                args.strategy,
390                event_sender.clone(),
391            )
392            .await?
393        }
394    };
395
396    let result = futures::future::join_all(futures).await;
397    drop(event_sender);
398    summary_handle.await??;
399    let len_failures = result.iter().filter(|r| r.is_err()).count();
400    eprintln!("{} results, {len_failures} failures", result.len());
401    for r in result {
402        if let Err(e) = r {
403            warn!("Task failed: {:?}", e);
404        }
405    }
406    if len_failures > 0 {
407        bail!("Finished with failures");
408    }
409    info!("Finished successfully");
410    Ok(())
411}
412
413async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> {
414    if let Some(invite_code) = invite_code {
415        Some(invite_code)
416    } else {
417        // Try to get an invite code through cli in a best effort basis
418        match get_invite_code_cli(0.into()).await {
419            Ok(invite_code) => Some(invite_code),
420            Err(e) => {
421                info!(
422                    "No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."
423                );
424                None
425            }
426        }
427    }
428}
429
430#[allow(clippy::too_many_arguments)]
431async fn run_load_test(
432    archive_dir: Option<PathBuf>,
433    users: u16,
434    invite_code: Option<InviteCode>,
435    initial_notes: Option<OOBNotes>,
436    generate_invoice_with: Option<LnInvoiceGeneration>,
437    generated_invoices_per_user: u16,
438    ln_payment_sleep: Duration,
439    invoices_from_file: Vec<Bolt11Invoice>,
440    gateway_id: Option<String>,
441    notes_per_user: u16,
442    note_denomination: Amount,
443    invoice_amount: Amount,
444    event_sender: mpsc::UnboundedSender<MetricEvent>,
445) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
446    let db_path = get_db_path(&archive_dir);
447    let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
448    let minimum_notes = notes_per_user * users;
449    let minimum_amount_required = note_denomination * u64::from(minimum_notes);
450
451    reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
452    get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
453    print_coordinator_notes(&coordinator).await?;
454    info!(
455        "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
456    );
457    remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
458    print_coordinator_notes(&coordinator).await?;
459
460    let users_clients = get_users_clients(users, db_path, invite_code).await?;
461
462    let mut users_notes =
463        get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
464    let mut users_invoices = HashMap::new();
465    let mut user = 0;
466    // Distribute invoices to users in a round robin fashion
467    for invoice in invoices_from_file {
468        users_invoices
469            .entry(user)
470            .or_insert_with(Vec::new)
471            .push(invoice);
472        user = (user + 1) % users;
473    }
474
475    info!("Starting user tasks");
476    let futures = users_clients
477        .into_iter()
478        .enumerate()
479        .map(|(u, client)| {
480            let u = u as u16;
481            let oob_notes = users_notes.remove(&u).unwrap();
482            let invoices = users_invoices.remove(&u).unwrap_or_default();
483            let event_sender = event_sender.clone();
484            let f: BoxFuture<_> = Box::pin(do_load_test_user_task(
485                format!("User {u}:"),
486                client,
487                oob_notes,
488                generated_invoices_per_user,
489                ln_payment_sleep,
490                invoice_amount,
491                invoices,
492                generate_invoice_with,
493                event_sender,
494                gateway_id.clone(),
495            ));
496            f
497        })
498        .collect::<Vec<_>>();
499
500    Ok(futures)
501}
502
503async fn get_notes_for_users(
504    users: u16,
505    notes_per_user: u16,
506    coordinator: ClientHandleArc,
507    note_denomination: Amount,
508) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> {
509    let mut users_notes = HashMap::new();
510    for u in 0..users {
511        users_notes.insert(u, Vec::with_capacity(notes_per_user.into()));
512        for _ in 0..notes_per_user {
513            let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?;
514            let user_amount = oob_notes.total_amount();
515            info!("Giving {user_amount} to user {u}");
516            users_notes.get_mut(&u).unwrap().push(oob_notes);
517        }
518    }
519    Ok(users_notes)
520}
521
522async fn get_users_clients(
523    n: u16,
524    db_path: Option<PathBuf>,
525    invite_code: Option<InviteCode>,
526) -> anyhow::Result<Vec<ClientHandleArc>> {
527    let mut users_clients = Vec::with_capacity(n.into());
528    for u in 0..n {
529        let (client, _) = get_user_client(u, &db_path, &invite_code).await?;
530        users_clients.push(client);
531    }
532    Ok(users_clients)
533}
534
535async fn get_user_client(
536    user_index: u16,
537    db_path: &Option<PathBuf>,
538    invite_code: &Option<InviteCode>,
539) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
540    let user_db = db_path
541        .as_ref()
542        .map(|db_path| db_path.join(format!("user_{user_index}.db")));
543    let user_invite_code = if user_db.as_ref().is_some_and(|db| db.exists()) {
544        None
545    } else {
546        invite_code.clone()
547    };
548    let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?;
549    // if lightning module is present, update the gateway cache
550    if let Ok(ln_client) = client.get_first_module::<LightningClientModule>() {
551        let _ = ln_client.update_gateway_cache().await;
552    }
553    Ok((client, invite_code))
554}
555
556async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> {
557    info!("Note summary:");
558    let summary = get_note_summary(coordinator).await?;
559    for (k, v) in summary.iter() {
560        info!("{k}: {v}");
561    }
562    Ok(())
563}
564
565async fn get_required_notes(
566    coordinator: &ClientHandleArc,
567    minimum_amount_required: Amount,
568    event_sender: &mpsc::UnboundedSender<MetricEvent>,
569) -> anyhow::Result<()> {
570    let current_balance = coordinator.get_balance_for_btc().await?;
571
572    if current_balance < minimum_amount_required {
573        let diff = minimum_amount_required.saturating_sub(current_balance);
574        info!(
575            "Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"
576        );
577        match try_get_notes_cli(&diff, 5).await {
578            Ok(notes) => {
579                info!("Got {} more notes, reissuing them", notes.total_amount());
580                reissue_notes(coordinator, notes, event_sender).await?;
581            }
582            Err(e) => {
583                info!("Unable to get more notes: '{e}', will try to proceed without them");
584            }
585        }
586    } else {
587        info!(
588            "Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"
589        );
590    }
591    Ok(())
592}
593
594async fn reissue_initial_notes(
595    initial_notes: Option<OOBNotes>,
596    coordinator: &ClientHandleArc,
597    event_sender: &mpsc::UnboundedSender<MetricEvent>,
598) -> anyhow::Result<()> {
599    if let Some(notes) = initial_notes {
600        let amount = notes.total_amount();
601        info!("Reissuing initial notes, got {amount}");
602        reissue_notes(coordinator, notes, event_sender).await?;
603    }
604    Ok(())
605}
606
607async fn get_coordinator_client(
608    db_path: &Option<PathBuf>,
609    invite_code: &Option<InviteCode>,
610) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
611    let (client, invite_code) = if let Some(db_path) = db_path {
612        let coordinator_db = db_path.join("coordinator.db");
613        if coordinator_db.exists() {
614            build_client(invite_code.clone(), Some(&coordinator_db)).await?
615        } else {
616            tokio::fs::create_dir_all(db_path).await?;
617            build_client(
618                Some(invite_code.clone().context(
619                    "Running on this archive dir for the first time, an invite code is required",
620                )?),
621                Some(&coordinator_db),
622            )
623            .await?
624        }
625    } else {
626        build_client(
627            Some(
628                invite_code
629                    .clone()
630                    .context("No archive dir given, an invite code is strictly required")?,
631            ),
632            None,
633        )
634        .await?
635    };
636    Ok((client, invite_code))
637}
638
639fn get_db_path(archive_dir: &Option<PathBuf>) -> Option<PathBuf> {
640    archive_dir.as_ref().map(|p| p.join("db"))
641}
642
643async fn get_lightning_gateway(
644    client: &ClientHandleArc,
645    gateway_id: Option<String>,
646) -> Option<LightningGateway> {
647    let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id");
648    let ln_module = client
649        .get_first_module::<LightningClientModule>()
650        .expect("Must have ln client module");
651    ln_module.select_gateway(&gateway_id).await
652}
653
654#[allow(clippy::too_many_arguments)]
655async fn do_load_test_user_task(
656    prefix: String,
657    client: ClientHandleArc,
658    oob_notes: Vec<OOBNotes>,
659    generated_invoices_per_user: u16,
660    ln_payment_sleep: Duration,
661    invoice_amount: Amount,
662    additional_invoices: Vec<Bolt11Invoice>,
663    generate_invoice_with: Option<LnInvoiceGeneration>,
664    event_sender: mpsc::UnboundedSender<MetricEvent>,
665    gateway_id: Option<String>,
666) -> anyhow::Result<()> {
667    let ln_gateway = get_lightning_gateway(&client, gateway_id).await;
668    for oob_note in oob_notes {
669        let amount = oob_note.total_amount();
670        reissue_notes(&client, oob_note, &event_sender)
671            .await
672            .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
673    }
674    let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable();
675    while let Some(_) = generated_invoices_per_user_iterator.next() {
676        let total_amount = get_note_summary(&client).await?.total_amount();
677        if invoice_amount > total_amount {
678            warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
679        } else {
680            match generate_invoice_with {
681                Some(LnInvoiceGeneration::LdkLightningCli) => {
682                    let invoice = ldk_create_invoice(invoice_amount).await?;
683                    gateway_pay_invoice(
684                        &prefix,
685                        "LND",
686                        &client,
687                        invoice.clone(),
688                        &event_sender,
689                        ln_gateway.clone(),
690                    )
691                    .await?;
692                    ldk_wait_invoice_payment(&invoice).await?;
693                }
694                None if additional_invoices.is_empty() => {
695                    debug!(
696                        "No method given to generate an invoice and no invoices on file, will not test the gateway"
697                    );
698                    break;
699                }
700                None => {
701                    break;
702                }
703            }
704            if generated_invoices_per_user_iterator.peek().is_some() {
705                // Only sleep while there are more invoices to pay
706                fedimint_core::task::sleep(ln_payment_sleep).await;
707            }
708        }
709    }
710    let mut additional_invoices = additional_invoices.into_iter().peekable();
711    while let Some(invoice) = additional_invoices.next() {
712        let total_amount = get_note_summary(&client).await?.total_amount();
713        let invoice_amount =
714            Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default());
715        if invoice_amount > total_amount {
716            warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
717        } else if invoice_amount == Amount::ZERO {
718            warn!("Can't pay invoice {invoice}, amount is zero");
719        } else {
720            gateway_pay_invoice(
721                &prefix,
722                "unknown",
723                &client,
724                invoice,
725                &event_sender,
726                ln_gateway.clone(),
727            )
728            .await?;
729            if additional_invoices.peek().is_some() {
730                // Only sleep while there are more invoices to pay
731                fedimint_core::task::sleep(ln_payment_sleep).await;
732            }
733        }
734    }
735    Ok(())
736}
737
738#[allow(clippy::too_many_arguments)]
739async fn run_ln_circular_load_test(
740    archive_dir: Option<PathBuf>,
741    users: u16,
742    invite_code: Option<InviteCode>,
743    initial_notes: Option<OOBNotes>,
744    test_duration: Duration,
745    ln_payment_sleep: Duration,
746    notes_per_user: u16,
747    note_denomination: Amount,
748    invoice_amount: Amount,
749    strategy: LnCircularStrategy,
750    event_sender: mpsc::UnboundedSender<MetricEvent>,
751) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
752    let db_path = get_db_path(&archive_dir);
753    let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
754    let minimum_notes = notes_per_user * users;
755    let minimum_amount_required = note_denomination * u64::from(minimum_notes);
756
757    reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
758    get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
759
760    info!(
761        "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
762    );
763    remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
764
765    print_coordinator_notes(&coordinator).await?;
766
767    let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?;
768
769    let mut users_notes =
770        get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
771
772    info!("Starting user tasks");
773    let futures = users_clients
774        .into_iter()
775        .enumerate()
776        .map(|(u, client)| {
777            let u = u as u16;
778            let oob_notes = users_notes.remove(&u).unwrap();
779            let event_sender = event_sender.clone();
780            let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task(
781                format!("User {u}:"),
782                client,
783                invite_code.clone(),
784                oob_notes,
785                test_duration,
786                ln_payment_sleep,
787                invoice_amount,
788                strategy,
789                event_sender,
790            ));
791            f
792        })
793        .collect::<Vec<_>>();
794
795    Ok(futures)
796}
797
798#[allow(clippy::too_many_arguments)]
799async fn do_ln_circular_test_user_task(
800    prefix: String,
801    client: ClientHandleArc,
802    invite_code: Option<InviteCode>,
803    oob_notes: Vec<OOBNotes>,
804    test_duration: Duration,
805    ln_payment_sleep: Duration,
806    invoice_amount: Amount,
807    strategy: LnCircularStrategy,
808    event_sender: mpsc::UnboundedSender<MetricEvent>,
809) -> anyhow::Result<()> {
810    for oob_note in oob_notes {
811        let amount = oob_note.total_amount();
812        reissue_notes(&client, oob_note, &event_sender)
813            .await
814            .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
815    }
816    let initial_time = fedimint_core::time::now();
817    let still_ontime = || async {
818        fedimint_core::time::now()
819            .duration_since(initial_time)
820            .expect("time to work")
821            <= test_duration
822    };
823    let sleep_a_bit = || async {
824        if still_ontime().await {
825            fedimint_core::task::sleep(ln_payment_sleep).await;
826        }
827    };
828    match strategy {
829        LnCircularStrategy::TwoGateways => {
830            let invoice_generation = LnInvoiceGeneration::LdkLightningCli;
831            while still_ontime().await {
832                let gateway_id = get_gateway_id(invoice_generation).await?;
833                let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await;
834                run_two_gateways_strategy(
835                    &prefix,
836                    &invoice_generation,
837                    &invoice_amount,
838                    &event_sender,
839                    &client,
840                    ln_gateway,
841                )
842                .await?;
843                sleep_a_bit().await;
844            }
845        }
846        LnCircularStrategy::SelfPayment => {
847            while still_ontime().await {
848                do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?;
849                sleep_a_bit().await;
850            }
851        }
852        LnCircularStrategy::PartnerPingPong => {
853            let (partner, _) = build_client(invite_code, None).await?;
854            while still_ontime().await {
855                do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender)
856                    .await?;
857                sleep_a_bit().await;
858            }
859        }
860    }
861    Ok(())
862}
863
864const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice";
865
866async fn run_two_gateways_strategy(
867    prefix: &str,
868    invoice_generation: &LnInvoiceGeneration,
869    invoice_amount: &Amount,
870    event_sender: &mpsc::UnboundedSender<MetricEvent>,
871    client: &ClientHandleArc,
872    ln_gateway: Option<LightningGateway>,
873) -> Result<(), anyhow::Error> {
874    let create_invoice_time = fedimint_core::time::now();
875    match *invoice_generation {
876        LnInvoiceGeneration::LdkLightningCli => {
877            let invoice = ldk_create_invoice(*invoice_amount).await?;
878            let elapsed = create_invoice_time.elapsed()?;
879            info!("Created invoice using LDK in {elapsed:?}");
880            event_sender.send(MetricEvent {
881                name: GATEWAY_CREATE_INVOICE.into(),
882                duration: elapsed,
883            })?;
884            gateway_pay_invoice(
885                prefix,
886                "LND",
887                client,
888                invoice.clone(),
889                event_sender,
890                ln_gateway.clone(),
891            )
892            .await?;
893            ldk_wait_invoice_payment(&invoice).await?;
894            let (operation_id, invoice) =
895                client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?;
896            let pay_invoice_time = fedimint_core::time::now();
897            ldk_pay_invoice(invoice).await?;
898            wait_invoice_payment(
899                prefix,
900                "LND",
901                client,
902                operation_id,
903                event_sender,
904                pay_invoice_time,
905            )
906            .await?;
907        }
908    }
909    Ok(())
910}
911
912async fn do_self_payment(
913    prefix: &str,
914    client: &ClientHandleArc,
915    invoice_amount: Amount,
916    event_sender: &mpsc::UnboundedSender<MetricEvent>,
917) -> anyhow::Result<()> {
918    let (operation_id, invoice) =
919        client_create_invoice(client, invoice_amount, event_sender, None).await?;
920    let pay_invoice_time = fedimint_core::time::now();
921    let lightning_module = client.get_first_module::<LightningClientModule>()?;
922    //let gateway = lightning_module.select_active_gateway_opt().await;
923    lightning_module
924        .pay_bolt11_invoice(None, invoice, ())
925        .await?;
926    wait_invoice_payment(
927        prefix,
928        "gateway",
929        client,
930        operation_id,
931        event_sender,
932        pay_invoice_time,
933    )
934    .await?;
935    Ok(())
936}
937
938async fn do_partner_ping_pong(
939    prefix: &str,
940    client: &ClientHandleArc,
941    partner: &ClientHandleArc,
942    invoice_amount: Amount,
943    event_sender: &mpsc::UnboundedSender<MetricEvent>,
944) -> anyhow::Result<()> {
945    // Ping (partner creates invoice, client pays)
946    let (operation_id, invoice) =
947        client_create_invoice(partner, invoice_amount, event_sender, None).await?;
948    let pay_invoice_time = fedimint_core::time::now();
949    let lightning_module = client.get_first_module::<LightningClientModule>()?;
950    // TODO: Select random gateway?
951    //let gateway = lightning_module.select_active_gateway_opt().await;
952    lightning_module
953        .pay_bolt11_invoice(None, invoice, ())
954        .await?;
955    wait_invoice_payment(
956        prefix,
957        "gateway",
958        partner,
959        operation_id,
960        event_sender,
961        pay_invoice_time,
962    )
963    .await?;
964    // Pong (client creates invoice, partner pays)
965    let (operation_id, invoice) =
966        client_create_invoice(client, invoice_amount, event_sender, None).await?;
967    let pay_invoice_time = fedimint_core::time::now();
968    let partner_lightning_module = partner.get_first_module::<LightningClientModule>()?;
969    //let gateway = partner_lightning_module.select_active_gateway_opt().await;
970    // TODO: Select random gateway?
971    partner_lightning_module
972        .pay_bolt11_invoice(None, invoice, ())
973        .await?;
974    wait_invoice_payment(
975        prefix,
976        "gateway",
977        client,
978        operation_id,
979        event_sender,
980        pay_invoice_time,
981    )
982    .await?;
983    Ok(())
984}
985
986async fn wait_invoice_payment(
987    prefix: &str,
988    gateway_name: &str,
989    client: &ClientHandleArc,
990    operation_id: fedimint_core::core::OperationId,
991    event_sender: &mpsc::UnboundedSender<MetricEvent>,
992    pay_invoice_time: std::time::SystemTime,
993) -> anyhow::Result<()> {
994    let elapsed = pay_invoice_time.elapsed()?;
995    info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}");
996    event_sender.send(MetricEvent {
997        name: format!("gateway_{gateway_name}_payment_received_started"),
998        duration: elapsed,
999    })?;
1000    let lightning_module = client.get_first_module::<LightningClientModule>()?;
1001    let mut updates = lightning_module
1002        .subscribe_ln_receive(operation_id)
1003        .await?
1004        .into_stream();
1005    while let Some(update) = updates.next().await {
1006        debug!(%prefix, ?update, "Invoice payment update");
1007        match update {
1008            LnReceiveState::Claimed => {
1009                let elapsed: Duration = pay_invoice_time.elapsed()?;
1010                info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}");
1011                event_sender.send(MetricEvent {
1012                    name: "gateway_payment_received_success".into(),
1013                    duration: elapsed,
1014                })?;
1015                event_sender.send(MetricEvent {
1016                    name: format!("gateway_{gateway_name}_payment_received_success"),
1017                    duration: elapsed,
1018                })?;
1019                break;
1020            }
1021            LnReceiveState::Canceled { reason } => {
1022                let elapsed: Duration = pay_invoice_time.elapsed()?;
1023                info!(
1024                    "{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"
1025                );
1026                event_sender.send(MetricEvent {
1027                    name: "gateway_payment_received_canceled".into(),
1028                    duration: elapsed,
1029                })?;
1030                break;
1031            }
1032            _ => {}
1033        }
1034    }
1035    Ok(())
1036}
1037
1038async fn client_create_invoice(
1039    client: &ClientHandleArc,
1040    invoice_amount: Amount,
1041    event_sender: &mpsc::UnboundedSender<MetricEvent>,
1042    ln_gateway: Option<LightningGateway>,
1043) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> {
1044    let create_invoice_time = fedimint_core::time::now();
1045    let lightning_module = client.get_first_module::<LightningClientModule>()?;
1046    let desc = Description::new("test".to_string())?;
1047    let (operation_id, invoice, _) = lightning_module
1048        .create_bolt11_invoice(
1049            invoice_amount,
1050            Bolt11InvoiceDescription::Direct(desc),
1051            None,
1052            (),
1053            ln_gateway,
1054        )
1055        .await?;
1056    let elapsed = create_invoice_time.elapsed()?;
1057    info!("Created invoice using gateway in {elapsed:?}");
1058    event_sender.send(MetricEvent {
1059        name: GATEWAY_CREATE_INVOICE.into(),
1060        duration: elapsed,
1061    })?;
1062    Ok((operation_id, invoice))
1063}
1064
1065fn test_download_config(
1066    endpoints: &ConnectorRegistry,
1067    invite_code: &InviteCode,
1068    users: u16,
1069    event_sender: &mpsc::UnboundedSender<MetricEvent>,
1070) -> Vec<BoxFuture<'static, anyhow::Result<()>>> {
1071    (0..users)
1072        .map(move |_| {
1073            let invite_code = invite_code.clone();
1074            let event_sender = event_sender.clone();
1075            let endpoints = endpoints.clone();
1076            let f: BoxFuture<_> = Box::pin(async move {
1077                let m = fedimint_core::time::now();
1078                let _ = fedimint_api_client::api::net::ConnectorType::default()
1079                    .download_from_invite_code(&endpoints, &invite_code)
1080                    .await?;
1081                event_sender.send(MetricEvent {
1082                    name: "download_client_config".into(),
1083                    duration: m.elapsed()?,
1084                })?;
1085                Ok(())
1086            });
1087            f
1088        })
1089        .collect()
1090}
1091
1092async fn test_connect_raw_client(
1093    endpoints: &ConnectorRegistry,
1094    invite_code: InviteCode,
1095    users: u16,
1096    duration: Duration,
1097    timeout: Duration,
1098    limit_endpoints: Option<usize>,
1099    event_sender: mpsc::UnboundedSender<MetricEvent>,
1100) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
1101    use jsonrpsee_core::client::ClientT;
1102    use jsonrpsee_ws_client::WsClientBuilder;
1103
1104    let (mut cfg, _) = fedimint_api_client::api::net::ConnectorType::default()
1105        .download_from_invite_code(endpoints, &invite_code)
1106        .await?;
1107
1108    if let Some(limit_endpoints) = limit_endpoints {
1109        cfg.global.api_endpoints = cfg
1110            .global
1111            .api_endpoints
1112            .into_iter()
1113            .take(limit_endpoints)
1114            .collect();
1115        info!("Limiting endpoints to {:?}", cfg.global.api_endpoints);
1116    }
1117
1118    info!("Connecting to {users} clients");
1119    let clients = (0..users)
1120        .flat_map(|_| {
1121            cfg.global.api_endpoints.values().map(|url| async {
1122                let ws_client = WsClientBuilder::default()
1123                    .request_timeout(timeout)
1124                    .connection_timeout(timeout)
1125                    .build(url_to_string_with_default_port(&url.url))
1126                    .await?;
1127                Ok::<_, anyhow::Error>(ws_client)
1128            })
1129        })
1130        .collect::<Vec<_>>();
1131    let clients = futures::future::try_join_all(clients).await?;
1132    info!("Keeping {users} clients connected for {duration:?}");
1133    Ok(clients
1134        .into_iter()
1135        .map(|client| {
1136            let event_sender = event_sender.clone();
1137            let f: BoxFuture<_> = Box::pin(async move {
1138                let initial_time = fedimint_core::time::now();
1139                while initial_time.elapsed()? < duration {
1140                    let m = fedimint_core::time::now();
1141                    let _epoch: u64 = client
1142                        .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()])
1143                        .await?;
1144                    event_sender.send(MetricEvent {
1145                        name: SESSION_COUNT_ENDPOINT.into(),
1146                        duration: m.elapsed()?,
1147                    })?;
1148                    fedimint_core::task::sleep(Duration::from_secs(1)).await;
1149                }
1150                Ok(())
1151            });
1152            f
1153        })
1154        .collect())
1155}
1156
1157fn url_to_string_with_default_port(url: &SafeUrl) -> String {
1158    format!(
1159        "{}://{}:{}{}",
1160        url.scheme(),
1161        url.host().expect("Asserted on construction"),
1162        url.port_or_known_default()
1163            .expect("Asserted on construction"),
1164        url.path()
1165    )
1166}
1167
1168async fn handle_metrics_summary(
1169    opts: Opts,
1170    mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>,
1171) -> anyhow::Result<()> {
1172    let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs();
1173    let mut metrics_json_output_files = vec![];
1174    let mut previous_metrics = vec![];
1175    let mut comparison_output = None;
1176    if let Some(archive_dir) = opts.archive_dir {
1177        let mut archive_metrics = archive_dir.join("metrics");
1178        archive_metrics.push(opts.users.to_string());
1179        tokio::fs::create_dir_all(&archive_metrics).await?;
1180        let mut archive_comparisons = archive_dir.join("comparisons");
1181        archive_comparisons.push(opts.users.to_string());
1182        tokio::fs::create_dir_all(&archive_comparisons).await?;
1183
1184        let latest_metrics_file = std::fs::read_dir(&archive_metrics)?
1185            .map(|entry| {
1186                let entry = entry.unwrap();
1187                let metadata = entry.metadata().unwrap();
1188                let created = metadata
1189                    .created()
1190                    .unwrap_or_else(|_| metadata.modified().unwrap());
1191                (entry, created)
1192            })
1193            .max_by_key(|(_entry, created)| created.to_owned())
1194            .map(|(entry, _)| entry.path());
1195        if let Some(latest_metrics_file) = latest_metrics_file {
1196            let display_path = latest_metrics_file.display();
1197            let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file)
1198                .await
1199                .with_context(|| format!("Failed to open {display_path}"))?;
1200            let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines();
1201            while let Some(line) = lines.next_line().await? {
1202                match serde_json::from_str::<EventMetricSummary>(&line) {
1203                    Ok(metric) => {
1204                        previous_metrics.push(metric);
1205                    }
1206                    Err(e) => {
1207                        warn!("Failed to parse previous metric: {e:?}");
1208                    }
1209                }
1210            }
1211        }
1212        let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",));
1213        let new_metric_output = BufWriter::new(
1214            OpenOptions::new()
1215                .write(true)
1216                .create(true)
1217                .truncate(true)
1218                .open(new_metric_output)
1219                .await?,
1220        );
1221        metrics_json_output_files.push(new_metric_output);
1222        if !previous_metrics.is_empty() {
1223            let new_comparison_output =
1224                archive_comparisons.join(format!("{timestamp_seconds}.json",));
1225            comparison_output = Some(BufWriter::new(
1226                OpenOptions::new()
1227                    .write(true)
1228                    .create(true)
1229                    .truncate(true)
1230                    .open(new_comparison_output)
1231                    .await?,
1232            ));
1233        }
1234    }
1235    if let Some(metrics_json_output) = opts.metrics_json_output {
1236        metrics_json_output_files.push(BufWriter::new(
1237            tokio::fs::OpenOptions::new()
1238                .write(true)
1239                .create(true)
1240                .truncate(true)
1241                .open(metrics_json_output)
1242                .await?,
1243        ));
1244    }
1245    let mut results = BTreeMap::new();
1246    while let Some(event) = event_receiver.recv().await {
1247        let entry = results.entry(event.name).or_insert_with(Vec::new);
1248        entry.push(event.duration);
1249    }
1250    let mut previous_metrics = previous_metrics
1251        .into_iter()
1252        .map(|metric| (metric.name.clone(), metric))
1253        .collect::<HashMap<_, _>>();
1254    for (k, mut v) in results {
1255        v.sort();
1256        let n = v.len();
1257        let max = v.iter().last().unwrap();
1258        let min = v.first().unwrap();
1259        let median = v[n / 2];
1260        let sum: Duration = v.iter().sum();
1261        let avg = sum / n as u32;
1262        let metric_summary = EventMetricSummary {
1263            name: k.clone(),
1264            users: u64::from(opts.users),
1265            n: n as u64,
1266            avg_ms: avg.as_millis(),
1267            median_ms: median.as_millis(),
1268            max_ms: max.as_millis(),
1269            min_ms: min.as_millis(),
1270            timestamp_seconds,
1271        };
1272        let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) {
1273            if previous_metric.n == metric_summary.n {
1274                fn calculate_gain(current: u128, previous: u128) -> f64 {
1275                    current as f64 / previous as f64
1276                }
1277                let comparison = EventMetricComparison {
1278                    avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms),
1279                    median_ms_gain: calculate_gain(
1280                        metric_summary.median_ms,
1281                        previous_metric.median_ms,
1282                    ),
1283                    max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms),
1284                    min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms),
1285                    current: metric_summary.clone(),
1286                    previous: previous_metric,
1287                };
1288                if let Some(comparison_output) = &mut comparison_output {
1289                    let comparison_json =
1290                        serde_json::to_string(&comparison).expect("to be serializable");
1291                    comparison_output
1292                        .write_all(format!("{comparison_json}\n").as_bytes())
1293                        .await
1294                        .expect("to write on file");
1295                }
1296                Some(comparison)
1297            } else {
1298                info!(
1299                    "Skipping comparison for {k} because previous metric has different n ({} vs {})",
1300                    previous_metric.n, metric_summary.n
1301                );
1302                None
1303            }
1304        } else {
1305            None
1306        };
1307        if let Some(comparison) = comparison {
1308            println!(
1309                "{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"
1310            );
1311        } else {
1312            println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}");
1313        }
1314        let metric_summary_json =
1315            serde_json::to_string(&metric_summary).expect("to be serializable");
1316        for metrics_json_output_file in &mut metrics_json_output_files {
1317            metrics_json_output_file
1318                .write_all(format!("{metric_summary_json}\n").as_bytes())
1319                .await
1320                .expect("to write on file");
1321        }
1322    }
1323    for mut output in metrics_json_output_files {
1324        output.flush().await?;
1325    }
1326    if let Some(mut output) = comparison_output {
1327        output.flush().await?;
1328    }
1329    Ok(())
1330}
1331
1332async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> {
1333    let gateway_json = match generate_invoice_with {
1334        LnInvoiceGeneration::LdkLightningCli => {
1335            // If we are paying a lnd invoice, we use the LDK node
1336            cmd!(GatewayLndCli, "info").out_json().await
1337        }
1338    }?;
1339
1340    let gatewayd_version = devimint::util::Gatewayd::version_or_default().await;
1341    let gateway_id = if gatewayd_version < *VERSION_0_10_0_ALPHA {
1342        gateway_json["gateway_id"]
1343            .as_str()
1344            .context("gateway_id must be a string")?
1345            .to_owned()
1346    } else {
1347        gateway_json["registrations"]["http"][1]
1348            .as_str()
1349            .context("gateway id must be a string")?
1350            .to_owned()
1351    };
1352
1353    Ok(gateway_id)
1354}