fedimint_load_test_tool/
main.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_precision_loss)]
4#![allow(clippy::missing_errors_doc)]
5#![allow(clippy::ref_option)]
6#![allow(clippy::too_many_lines)]
7#![allow(clippy::large_futures)]
8
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::time::Duration;
13use std::vec;
14
15use anyhow::{Context, anyhow, bail};
16use clap::{Args, Parser, Subcommand, ValueEnum};
17use common::{
18    gateway_pay_invoice, get_note_summary, ldk_create_invoice, ldk_pay_invoice,
19    ldk_wait_invoice_payment, parse_gateway_id, reissue_notes,
20};
21use devimint::cmd;
22use devimint::util::GatewayLndCli;
23use fedimint_client::ClientHandleArc;
24use fedimint_connectors::ConnectorRegistry;
25use fedimint_core::Amount;
26use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
27use fedimint_core::invite_code::InviteCode;
28use fedimint_core::module::ApiRequestErased;
29use fedimint_core::runtime::spawn;
30use fedimint_core::util::{BoxFuture, SafeUrl};
31use fedimint_ln_client::{LightningClientModule, LnReceiveState};
32use fedimint_ln_common::LightningGateway;
33use fedimint_mint_client::OOBNotes;
34use futures::StreamExt;
35use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
36use serde::{Deserialize, Serialize};
37use tokio::fs::OpenOptions;
38use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
39use tokio::sync::mpsc;
40use tracing::{debug, info, warn};
41
42use crate::common::{
43    build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli,
44};
45pub mod common;
46
47#[derive(Parser, Clone)]
48#[command(version)]
49struct Opts {
50    #[arg(
51        long,
52        default_value = "10",
53        help = "Number of users. Each user will work in parallel"
54    )]
55    users: u16,
56
57    #[arg(long, help = "Output with the metrics results in JSON format")]
58    metrics_json_output: Option<PathBuf>,
59
60    #[arg(
61        long,
62        help = "If given, will be used to store and retrieve past metrics for comparison purposes"
63    )]
64    archive_dir: Option<PathBuf>,
65
66    #[clap(subcommand)]
67    command: Command,
68}
69
70#[derive(Debug, Clone, Copy, ValueEnum)]
71enum LnInvoiceGeneration {
72    LdkLightningCli,
73}
74
75#[derive(Subcommand, Clone)]
76enum Command {
77    #[command(about = "Keep many websocket connections to a federation for a duration of time")]
78    TestConnect {
79        #[arg(long, help = "Federation invite code")]
80        invite_code: String,
81        #[arg(
82            long,
83            default_value = "60",
84            help = "How much time to keep the connections open, in seconds"
85        )]
86        duration_secs: u64,
87        #[arg(
88            long,
89            default_value = "120",
90            help = "Timeout for connection attempt and for each request, in secnods"
91        )]
92        timeout_secs: u64,
93        #[arg(
94            long,
95            help = "If given, will limit the number of endpoints (guardians) to connect to"
96        )]
97        limit_endpoints: Option<usize>,
98    },
99    #[command(about = "Try to download the client config many times.")]
100    TestDownload {
101        #[arg(long, help = "Federation invite code")]
102        invite_code: String,
103    },
104    #[command(
105        about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway"
106    )]
107    LoadTest(LoadTestArgs),
108    /// Run a load test where many users in parallel will receive then send a
109    /// payment through lightning.
110    /// It's 'circular' because the funds always come back to the same user then
111    /// we can keep making the payments in a loop
112    #[command()]
113    LnCircularLoadTest(LnCircularLoadTestArgs),
114}
115
116#[derive(Args, Clone)]
117struct LoadTestArgs {
118    #[arg(
119        long,
120        help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
121    )]
122    invite_code: Option<InviteCode>,
123
124    #[arg(
125        long,
126        help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
127    )]
128    initial_notes: Option<OOBNotes>,
129
130    #[arg(
131        long,
132        help = "Gateway Id. If none, retrieve one according to --generate-invoice-with"
133    )]
134    gateway_id: Option<String>,
135
136    #[arg(
137        long,
138        help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)"
139    )]
140    generate_invoice_with: Option<LnInvoiceGeneration>,
141
142    #[arg(
143        long,
144        default_value = "1",
145        help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided"
146    )]
147    invoices_per_user: u16,
148
149    #[arg(
150        long,
151        default_value = "0",
152        help = "How many seconds to sleep between LN payments"
153    )]
154    ln_payment_sleep_secs: u64,
155
156    #[arg(
157        long,
158        help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid"
159    )]
160    invoices_file: Option<PathBuf>,
161
162    #[arg(
163        long,
164        help = "How many notes to distribute to each user",
165        default_value = "2"
166    )]
167    notes_per_user: u16,
168
169    #[arg(
170        long,
171        help = "Note denomination to use for the test",
172        default_value = "1024"
173    )]
174    note_denomination: Amount,
175
176    #[arg(
177        long,
178        help = "Invoice amount when generating one",
179        default_value = "1000"
180    )]
181    invoice_amount: Amount,
182}
183
184#[derive(Args, Clone)]
185struct LnCircularLoadTestArgs {
186    #[arg(
187        long,
188        help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
189    )]
190    invite_code: Option<InviteCode>,
191
192    #[arg(
193        long,
194        help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
195    )]
196    initial_notes: Option<OOBNotes>,
197
198    #[arg(
199        long,
200        default_value = "60",
201        help = "For how many seconds to run the test"
202    )]
203    test_duration_secs: u64,
204
205    #[arg(
206        long,
207        default_value = "0",
208        help = "How many seconds to sleep between LN payments"
209    )]
210    ln_payment_sleep_secs: u64,
211
212    #[arg(
213        long,
214        help = "How many notes to distribute to each user",
215        default_value = "1"
216    )]
217    notes_per_user: u16,
218
219    #[arg(
220        long,
221        help = "Note denomination to use for the test",
222        default_value = "1024"
223    )]
224    note_denomination: Amount,
225
226    #[arg(
227        long,
228        help = "Invoice amount when generating one",
229        default_value = "1000"
230    )]
231    invoice_amount: Amount,
232
233    #[arg(long)]
234    strategy: LnCircularStrategy,
235}
236
237#[derive(Debug, Clone, Copy, ValueEnum)]
238enum LnCircularStrategy {
239    /// The user will pay its own invoice
240    SelfPayment,
241    /// One gateway will pay/receive to/from the other, then they will swap
242    /// places
243    TwoGateways,
244    /// Two clients will pay to each other using the same gateway
245    PartnerPingPong,
246}
247
248#[derive(Debug, Clone)]
249pub struct MetricEvent {
250    name: String,
251    duration: Duration,
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize)]
255struct EventMetricSummary {
256    name: String,
257    users: u64,
258    n: u64,
259    avg_ms: u128,
260    median_ms: u128,
261    max_ms: u128,
262    min_ms: u128,
263    timestamp_seconds: u64,
264}
265
266#[derive(Debug, Serialize, Deserialize)]
267struct EventMetricComparison {
268    avg_ms_gain: f64,
269    median_ms_gain: f64,
270    max_ms_gain: f64,
271    min_ms_gain: f64,
272    current: EventMetricSummary,
273    previous: EventMetricSummary,
274}
275
276impl std::fmt::Display for EventMetricComparison {
277    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278        fn to_percent(gain: f64) -> String {
279            if gain >= 1.0 {
280                format!("+{:.2}%", (gain - 1.0) * 100.0)
281            } else {
282                format!("-{:.2}%", (1.0 - gain) * 100.0)
283            }
284        }
285        f.write_str(&format!(
286            "avg: {}, median: {}, max: {}, min: {}",
287            to_percent(self.avg_ms_gain),
288            to_percent(self.median_ms_gain),
289            to_percent(self.max_ms_gain),
290            to_percent(self.min_ms_gain),
291        ))
292    }
293}
294
295#[tokio::main]
296async fn main() -> anyhow::Result<()> {
297    fedimint_logging::TracingSetup::default().init()?;
298    let opts = Opts::parse();
299    let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
300    let summary_handle = spawn("handle metrics summary", {
301        let opts = opts.clone();
302        async { handle_metrics_summary(opts, event_receiver).await }
303    });
304    let futures = match opts.command.clone() {
305        Command::TestConnect {
306            invite_code,
307            duration_secs,
308            timeout_secs,
309            limit_endpoints,
310        } => {
311            let connectors = ConnectorRegistry::build_from_client_env()?.bind().await?;
312            let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
313            test_connect_raw_client(
314                &connectors,
315                invite_code,
316                opts.users,
317                Duration::from_secs(duration_secs),
318                Duration::from_secs(timeout_secs),
319                limit_endpoints,
320                event_sender.clone(),
321            )
322            .await?
323        }
324        Command::TestDownload { invite_code } => {
325            let connectors = ConnectorRegistry::build_from_client_env()?.bind().await?;
326            let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
327            test_download_config(&connectors, &invite_code, opts.users, &event_sender.clone())
328        }
329        Command::LoadTest(args) => {
330            let invite_code = invite_code_or_fallback(args.invite_code).await;
331
332            let gateway_id = if let Some(gateway_id) = args.gateway_id {
333                Some(gateway_id)
334            } else if let Some(generate_invoice_with) = args.generate_invoice_with {
335                Some(get_gateway_id(generate_invoice_with).await?)
336            } else {
337                None
338            };
339            let invoices = if let Some(invoices_file) = args.invoices_file {
340                let display_path = invoices_file.display();
341                let invoices_file = tokio::fs::File::open(&invoices_file)
342                    .await
343                    .with_context(|| format!("Failed to open {display_path}"))?;
344                let mut lines = tokio::io::BufReader::new(invoices_file).lines();
345                let mut invoices = vec![];
346                while let Some(line) = lines.next_line().await? {
347                    let invoice = Bolt11Invoice::from_str(&line)?;
348                    invoices.push(invoice);
349                }
350                invoices
351            } else {
352                vec![]
353            };
354            if args.generate_invoice_with.is_none() && invoices.is_empty() {
355                info!(
356                    "No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run"
357                );
358            }
359            run_load_test(
360                opts.archive_dir,
361                opts.users,
362                invite_code,
363                args.initial_notes,
364                args.generate_invoice_with,
365                args.invoices_per_user,
366                Duration::from_secs(args.ln_payment_sleep_secs),
367                invoices,
368                gateway_id,
369                args.notes_per_user,
370                args.note_denomination,
371                args.invoice_amount,
372                event_sender.clone(),
373            )
374            .await?
375        }
376        Command::LnCircularLoadTest(args) => {
377            let invite_code = invite_code_or_fallback(args.invite_code).await;
378            run_ln_circular_load_test(
379                opts.archive_dir,
380                opts.users,
381                invite_code,
382                args.initial_notes,
383                Duration::from_secs(args.test_duration_secs),
384                Duration::from_secs(args.ln_payment_sleep_secs),
385                args.notes_per_user,
386                args.note_denomination,
387                args.invoice_amount,
388                args.strategy,
389                event_sender.clone(),
390            )
391            .await?
392        }
393    };
394
395    let result = futures::future::join_all(futures).await;
396    drop(event_sender);
397    summary_handle.await??;
398    let len_failures = result.iter().filter(|r| r.is_err()).count();
399    eprintln!("{} results, {len_failures} failures", result.len());
400    for r in result {
401        if let Err(e) = r {
402            warn!("Task failed: {:?}", e);
403        }
404    }
405    if len_failures > 0 {
406        bail!("Finished with failures");
407    }
408    info!("Finished successfully");
409    Ok(())
410}
411
412async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> {
413    if let Some(invite_code) = invite_code {
414        Some(invite_code)
415    } else {
416        // Try to get an invite code through cli in a best effort basis
417        match get_invite_code_cli(0.into()).await {
418            Ok(invite_code) => Some(invite_code),
419            Err(e) => {
420                info!(
421                    "No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."
422                );
423                None
424            }
425        }
426    }
427}
428
429#[allow(clippy::too_many_arguments)]
430async fn run_load_test(
431    archive_dir: Option<PathBuf>,
432    users: u16,
433    invite_code: Option<InviteCode>,
434    initial_notes: Option<OOBNotes>,
435    generate_invoice_with: Option<LnInvoiceGeneration>,
436    generated_invoices_per_user: u16,
437    ln_payment_sleep: Duration,
438    invoices_from_file: Vec<Bolt11Invoice>,
439    gateway_id: Option<String>,
440    notes_per_user: u16,
441    note_denomination: Amount,
442    invoice_amount: Amount,
443    event_sender: mpsc::UnboundedSender<MetricEvent>,
444) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
445    let db_path = get_db_path(&archive_dir);
446    let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
447    let minimum_notes = notes_per_user * users;
448    let minimum_amount_required = note_denomination * u64::from(minimum_notes);
449
450    reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
451    get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
452    print_coordinator_notes(&coordinator).await?;
453    info!(
454        "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
455    );
456    remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
457    print_coordinator_notes(&coordinator).await?;
458
459    let users_clients = get_users_clients(users, db_path, invite_code).await?;
460
461    let mut users_notes =
462        get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
463    let mut users_invoices = HashMap::new();
464    let mut user = 0;
465    // Distribute invoices to users in a round robin fashion
466    for invoice in invoices_from_file {
467        users_invoices
468            .entry(user)
469            .or_insert_with(Vec::new)
470            .push(invoice);
471        user = (user + 1) % users;
472    }
473
474    info!("Starting user tasks");
475    let futures = users_clients
476        .into_iter()
477        .enumerate()
478        .map(|(u, client)| {
479            let u = u as u16;
480            let oob_notes = users_notes.remove(&u).unwrap();
481            let invoices = users_invoices.remove(&u).unwrap_or_default();
482            let event_sender = event_sender.clone();
483            let f: BoxFuture<_> = Box::pin(do_load_test_user_task(
484                format!("User {u}:"),
485                client,
486                oob_notes,
487                generated_invoices_per_user,
488                ln_payment_sleep,
489                invoice_amount,
490                invoices,
491                generate_invoice_with,
492                event_sender,
493                gateway_id.clone(),
494            ));
495            f
496        })
497        .collect::<Vec<_>>();
498
499    Ok(futures)
500}
501
502async fn get_notes_for_users(
503    users: u16,
504    notes_per_user: u16,
505    coordinator: ClientHandleArc,
506    note_denomination: Amount,
507) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> {
508    let mut users_notes = HashMap::new();
509    for u in 0..users {
510        users_notes.insert(u, Vec::with_capacity(notes_per_user.into()));
511        for _ in 0..notes_per_user {
512            let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?;
513            let user_amount = oob_notes.total_amount();
514            info!("Giving {user_amount} to user {u}");
515            users_notes.get_mut(&u).unwrap().push(oob_notes);
516        }
517    }
518    Ok(users_notes)
519}
520
521async fn get_users_clients(
522    n: u16,
523    db_path: Option<PathBuf>,
524    invite_code: Option<InviteCode>,
525) -> anyhow::Result<Vec<ClientHandleArc>> {
526    let mut users_clients = Vec::with_capacity(n.into());
527    for u in 0..n {
528        let (client, _) = get_user_client(u, &db_path, &invite_code).await?;
529        users_clients.push(client);
530    }
531    Ok(users_clients)
532}
533
534async fn get_user_client(
535    user_index: u16,
536    db_path: &Option<PathBuf>,
537    invite_code: &Option<InviteCode>,
538) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
539    let user_db = db_path
540        .as_ref()
541        .map(|db_path| db_path.join(format!("user_{user_index}.db")));
542    let user_invite_code = if user_db.as_ref().is_some_and(|db| db.exists()) {
543        None
544    } else {
545        invite_code.clone()
546    };
547    let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?;
548    // if lightning module is present, update the gateway cache
549    if let Ok(ln_client) = client.get_first_module::<LightningClientModule>() {
550        let _ = ln_client.update_gateway_cache().await;
551    }
552    Ok((client, invite_code))
553}
554
555async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> {
556    info!("Note summary:");
557    let summary = get_note_summary(coordinator).await?;
558    for (k, v) in summary.iter() {
559        info!("{k}: {v}");
560    }
561    Ok(())
562}
563
564async fn get_required_notes(
565    coordinator: &ClientHandleArc,
566    minimum_amount_required: Amount,
567    event_sender: &mpsc::UnboundedSender<MetricEvent>,
568) -> anyhow::Result<()> {
569    let current_balance = coordinator.get_balance_for_btc().await?;
570
571    if current_balance < minimum_amount_required {
572        let diff = minimum_amount_required.saturating_sub(current_balance);
573        info!(
574            "Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"
575        );
576        match try_get_notes_cli(&diff, 5).await {
577            Ok(notes) => {
578                info!("Got {} more notes, reissuing them", notes.total_amount());
579                reissue_notes(coordinator, notes, event_sender).await?;
580            }
581            Err(e) => {
582                info!("Unable to get more notes: '{e}', will try to proceed without them");
583            }
584        }
585    } else {
586        info!(
587            "Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"
588        );
589    }
590    Ok(())
591}
592
593async fn reissue_initial_notes(
594    initial_notes: Option<OOBNotes>,
595    coordinator: &ClientHandleArc,
596    event_sender: &mpsc::UnboundedSender<MetricEvent>,
597) -> anyhow::Result<()> {
598    if let Some(notes) = initial_notes {
599        let amount = notes.total_amount();
600        info!("Reissuing initial notes, got {amount}");
601        reissue_notes(coordinator, notes, event_sender).await?;
602    }
603    Ok(())
604}
605
606async fn get_coordinator_client(
607    db_path: &Option<PathBuf>,
608    invite_code: &Option<InviteCode>,
609) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
610    let (client, invite_code) = if let Some(db_path) = db_path {
611        let coordinator_db = db_path.join("coordinator.db");
612        if coordinator_db.exists() {
613            build_client(invite_code.clone(), Some(&coordinator_db)).await?
614        } else {
615            tokio::fs::create_dir_all(db_path).await?;
616            build_client(
617                Some(invite_code.clone().context(
618                    "Running on this archive dir for the first time, an invite code is required",
619                )?),
620                Some(&coordinator_db),
621            )
622            .await?
623        }
624    } else {
625        build_client(
626            Some(
627                invite_code
628                    .clone()
629                    .context("No archive dir given, an invite code is strictly required")?,
630            ),
631            None,
632        )
633        .await?
634    };
635    Ok((client, invite_code))
636}
637
638fn get_db_path(archive_dir: &Option<PathBuf>) -> Option<PathBuf> {
639    archive_dir.as_ref().map(|p| p.join("db"))
640}
641
642async fn get_lightning_gateway(
643    client: &ClientHandleArc,
644    gateway_id: Option<String>,
645) -> Option<LightningGateway> {
646    let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id");
647    let ln_module = client
648        .get_first_module::<LightningClientModule>()
649        .expect("Must have ln client module");
650    ln_module.select_gateway(&gateway_id).await
651}
652
653#[allow(clippy::too_many_arguments)]
654async fn do_load_test_user_task(
655    prefix: String,
656    client: ClientHandleArc,
657    oob_notes: Vec<OOBNotes>,
658    generated_invoices_per_user: u16,
659    ln_payment_sleep: Duration,
660    invoice_amount: Amount,
661    additional_invoices: Vec<Bolt11Invoice>,
662    generate_invoice_with: Option<LnInvoiceGeneration>,
663    event_sender: mpsc::UnboundedSender<MetricEvent>,
664    gateway_id: Option<String>,
665) -> anyhow::Result<()> {
666    let ln_gateway = get_lightning_gateway(&client, gateway_id).await;
667    for oob_note in oob_notes {
668        let amount = oob_note.total_amount();
669        reissue_notes(&client, oob_note, &event_sender)
670            .await
671            .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
672    }
673    let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable();
674    while let Some(_) = generated_invoices_per_user_iterator.next() {
675        let total_amount = get_note_summary(&client).await?.total_amount();
676        if invoice_amount > total_amount {
677            warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
678        } else {
679            match generate_invoice_with {
680                Some(LnInvoiceGeneration::LdkLightningCli) => {
681                    let invoice = ldk_create_invoice(invoice_amount).await?;
682                    gateway_pay_invoice(
683                        &prefix,
684                        "LND",
685                        &client,
686                        invoice.clone(),
687                        &event_sender,
688                        ln_gateway.clone(),
689                    )
690                    .await?;
691                    ldk_wait_invoice_payment(&invoice).await?;
692                }
693                None if additional_invoices.is_empty() => {
694                    debug!(
695                        "No method given to generate an invoice and no invoices on file, will not test the gateway"
696                    );
697                    break;
698                }
699                None => {
700                    break;
701                }
702            }
703            if generated_invoices_per_user_iterator.peek().is_some() {
704                // Only sleep while there are more invoices to pay
705                fedimint_core::task::sleep(ln_payment_sleep).await;
706            }
707        }
708    }
709    let mut additional_invoices = additional_invoices.into_iter().peekable();
710    while let Some(invoice) = additional_invoices.next() {
711        let total_amount = get_note_summary(&client).await?.total_amount();
712        let invoice_amount =
713            Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default());
714        if invoice_amount > total_amount {
715            warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
716        } else if invoice_amount == Amount::ZERO {
717            warn!("Can't pay invoice {invoice}, amount is zero");
718        } else {
719            gateway_pay_invoice(
720                &prefix,
721                "unknown",
722                &client,
723                invoice,
724                &event_sender,
725                ln_gateway.clone(),
726            )
727            .await?;
728            if additional_invoices.peek().is_some() {
729                // Only sleep while there are more invoices to pay
730                fedimint_core::task::sleep(ln_payment_sleep).await;
731            }
732        }
733    }
734    Ok(())
735}
736
737#[allow(clippy::too_many_arguments)]
738async fn run_ln_circular_load_test(
739    archive_dir: Option<PathBuf>,
740    users: u16,
741    invite_code: Option<InviteCode>,
742    initial_notes: Option<OOBNotes>,
743    test_duration: Duration,
744    ln_payment_sleep: Duration,
745    notes_per_user: u16,
746    note_denomination: Amount,
747    invoice_amount: Amount,
748    strategy: LnCircularStrategy,
749    event_sender: mpsc::UnboundedSender<MetricEvent>,
750) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
751    let db_path = get_db_path(&archive_dir);
752    let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
753    let minimum_notes = notes_per_user * users;
754    let minimum_amount_required = note_denomination * u64::from(minimum_notes);
755
756    reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
757    get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
758
759    info!(
760        "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
761    );
762    remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
763
764    print_coordinator_notes(&coordinator).await?;
765
766    let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?;
767
768    let mut users_notes =
769        get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
770
771    info!("Starting user tasks");
772    let futures = users_clients
773        .into_iter()
774        .enumerate()
775        .map(|(u, client)| {
776            let u = u as u16;
777            let oob_notes = users_notes.remove(&u).unwrap();
778            let event_sender = event_sender.clone();
779            let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task(
780                format!("User {u}:"),
781                client,
782                invite_code.clone(),
783                oob_notes,
784                test_duration,
785                ln_payment_sleep,
786                invoice_amount,
787                strategy,
788                event_sender,
789            ));
790            f
791        })
792        .collect::<Vec<_>>();
793
794    Ok(futures)
795}
796
797#[allow(clippy::too_many_arguments)]
798async fn do_ln_circular_test_user_task(
799    prefix: String,
800    client: ClientHandleArc,
801    invite_code: Option<InviteCode>,
802    oob_notes: Vec<OOBNotes>,
803    test_duration: Duration,
804    ln_payment_sleep: Duration,
805    invoice_amount: Amount,
806    strategy: LnCircularStrategy,
807    event_sender: mpsc::UnboundedSender<MetricEvent>,
808) -> anyhow::Result<()> {
809    for oob_note in oob_notes {
810        let amount = oob_note.total_amount();
811        reissue_notes(&client, oob_note, &event_sender)
812            .await
813            .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
814    }
815    let initial_time = fedimint_core::time::now();
816    let still_ontime = || async {
817        fedimint_core::time::now()
818            .duration_since(initial_time)
819            .expect("time to work")
820            <= test_duration
821    };
822    let sleep_a_bit = || async {
823        if still_ontime().await {
824            fedimint_core::task::sleep(ln_payment_sleep).await;
825        }
826    };
827    match strategy {
828        LnCircularStrategy::TwoGateways => {
829            let invoice_generation = LnInvoiceGeneration::LdkLightningCli;
830            while still_ontime().await {
831                let gateway_id = get_gateway_id(invoice_generation).await?;
832                let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await;
833                run_two_gateways_strategy(
834                    &prefix,
835                    &invoice_generation,
836                    &invoice_amount,
837                    &event_sender,
838                    &client,
839                    ln_gateway,
840                )
841                .await?;
842                sleep_a_bit().await;
843            }
844        }
845        LnCircularStrategy::SelfPayment => {
846            while still_ontime().await {
847                do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?;
848                sleep_a_bit().await;
849            }
850        }
851        LnCircularStrategy::PartnerPingPong => {
852            let (partner, _) = build_client(invite_code, None).await?;
853            while still_ontime().await {
854                do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender)
855                    .await?;
856                sleep_a_bit().await;
857            }
858        }
859    }
860    Ok(())
861}
862
863const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice";
864
865async fn run_two_gateways_strategy(
866    prefix: &str,
867    invoice_generation: &LnInvoiceGeneration,
868    invoice_amount: &Amount,
869    event_sender: &mpsc::UnboundedSender<MetricEvent>,
870    client: &ClientHandleArc,
871    ln_gateway: Option<LightningGateway>,
872) -> Result<(), anyhow::Error> {
873    let create_invoice_time = fedimint_core::time::now();
874    match *invoice_generation {
875        LnInvoiceGeneration::LdkLightningCli => {
876            let invoice = ldk_create_invoice(*invoice_amount).await?;
877            let elapsed = create_invoice_time.elapsed()?;
878            info!("Created invoice using LDK in {elapsed:?}");
879            event_sender.send(MetricEvent {
880                name: GATEWAY_CREATE_INVOICE.into(),
881                duration: elapsed,
882            })?;
883            gateway_pay_invoice(
884                prefix,
885                "LND",
886                client,
887                invoice.clone(),
888                event_sender,
889                ln_gateway.clone(),
890            )
891            .await?;
892            ldk_wait_invoice_payment(&invoice).await?;
893            let (operation_id, invoice) =
894                client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?;
895            let pay_invoice_time = fedimint_core::time::now();
896            ldk_pay_invoice(invoice).await?;
897            wait_invoice_payment(
898                prefix,
899                "LND",
900                client,
901                operation_id,
902                event_sender,
903                pay_invoice_time,
904            )
905            .await?;
906        }
907    }
908    Ok(())
909}
910
911async fn do_self_payment(
912    prefix: &str,
913    client: &ClientHandleArc,
914    invoice_amount: Amount,
915    event_sender: &mpsc::UnboundedSender<MetricEvent>,
916) -> anyhow::Result<()> {
917    let (operation_id, invoice) =
918        client_create_invoice(client, invoice_amount, event_sender, None).await?;
919    let pay_invoice_time = fedimint_core::time::now();
920    let lightning_module = client.get_first_module::<LightningClientModule>()?;
921    //let gateway = lightning_module.select_active_gateway_opt().await;
922    lightning_module
923        .pay_bolt11_invoice(None, invoice, ())
924        .await?;
925    wait_invoice_payment(
926        prefix,
927        "gateway",
928        client,
929        operation_id,
930        event_sender,
931        pay_invoice_time,
932    )
933    .await?;
934    Ok(())
935}
936
937async fn do_partner_ping_pong(
938    prefix: &str,
939    client: &ClientHandleArc,
940    partner: &ClientHandleArc,
941    invoice_amount: Amount,
942    event_sender: &mpsc::UnboundedSender<MetricEvent>,
943) -> anyhow::Result<()> {
944    // Ping (partner creates invoice, client pays)
945    let (operation_id, invoice) =
946        client_create_invoice(partner, invoice_amount, event_sender, None).await?;
947    let pay_invoice_time = fedimint_core::time::now();
948    let lightning_module = client.get_first_module::<LightningClientModule>()?;
949    // TODO: Select random gateway?
950    //let gateway = lightning_module.select_active_gateway_opt().await;
951    lightning_module
952        .pay_bolt11_invoice(None, invoice, ())
953        .await?;
954    wait_invoice_payment(
955        prefix,
956        "gateway",
957        partner,
958        operation_id,
959        event_sender,
960        pay_invoice_time,
961    )
962    .await?;
963    // Pong (client creates invoice, partner pays)
964    let (operation_id, invoice) =
965        client_create_invoice(client, invoice_amount, event_sender, None).await?;
966    let pay_invoice_time = fedimint_core::time::now();
967    let partner_lightning_module = partner.get_first_module::<LightningClientModule>()?;
968    //let gateway = partner_lightning_module.select_active_gateway_opt().await;
969    // TODO: Select random gateway?
970    partner_lightning_module
971        .pay_bolt11_invoice(None, invoice, ())
972        .await?;
973    wait_invoice_payment(
974        prefix,
975        "gateway",
976        client,
977        operation_id,
978        event_sender,
979        pay_invoice_time,
980    )
981    .await?;
982    Ok(())
983}
984
985async fn wait_invoice_payment(
986    prefix: &str,
987    gateway_name: &str,
988    client: &ClientHandleArc,
989    operation_id: fedimint_core::core::OperationId,
990    event_sender: &mpsc::UnboundedSender<MetricEvent>,
991    pay_invoice_time: std::time::SystemTime,
992) -> anyhow::Result<()> {
993    let elapsed = pay_invoice_time.elapsed()?;
994    info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}");
995    event_sender.send(MetricEvent {
996        name: format!("gateway_{gateway_name}_payment_received_started"),
997        duration: elapsed,
998    })?;
999    let lightning_module = client.get_first_module::<LightningClientModule>()?;
1000    let mut updates = lightning_module
1001        .subscribe_ln_receive(operation_id)
1002        .await?
1003        .into_stream();
1004    while let Some(update) = updates.next().await {
1005        debug!(%prefix, ?update, "Invoice payment update");
1006        match update {
1007            LnReceiveState::Claimed => {
1008                let elapsed: Duration = pay_invoice_time.elapsed()?;
1009                info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}");
1010                event_sender.send(MetricEvent {
1011                    name: "gateway_payment_received_success".into(),
1012                    duration: elapsed,
1013                })?;
1014                event_sender.send(MetricEvent {
1015                    name: format!("gateway_{gateway_name}_payment_received_success"),
1016                    duration: elapsed,
1017                })?;
1018                break;
1019            }
1020            LnReceiveState::Canceled { reason } => {
1021                let elapsed: Duration = pay_invoice_time.elapsed()?;
1022                info!(
1023                    "{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"
1024                );
1025                event_sender.send(MetricEvent {
1026                    name: "gateway_payment_received_canceled".into(),
1027                    duration: elapsed,
1028                })?;
1029                break;
1030            }
1031            _ => {}
1032        }
1033    }
1034    Ok(())
1035}
1036
1037async fn client_create_invoice(
1038    client: &ClientHandleArc,
1039    invoice_amount: Amount,
1040    event_sender: &mpsc::UnboundedSender<MetricEvent>,
1041    ln_gateway: Option<LightningGateway>,
1042) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> {
1043    let create_invoice_time = fedimint_core::time::now();
1044    let lightning_module = client.get_first_module::<LightningClientModule>()?;
1045    let desc = Description::new("test".to_string())?;
1046    let (operation_id, invoice, _) = lightning_module
1047        .create_bolt11_invoice(
1048            invoice_amount,
1049            Bolt11InvoiceDescription::Direct(desc),
1050            None,
1051            (),
1052            ln_gateway,
1053        )
1054        .await?;
1055    let elapsed = create_invoice_time.elapsed()?;
1056    info!("Created invoice using gateway in {elapsed:?}");
1057    event_sender.send(MetricEvent {
1058        name: GATEWAY_CREATE_INVOICE.into(),
1059        duration: elapsed,
1060    })?;
1061    Ok((operation_id, invoice))
1062}
1063
1064fn test_download_config(
1065    endpoints: &ConnectorRegistry,
1066    invite_code: &InviteCode,
1067    users: u16,
1068    event_sender: &mpsc::UnboundedSender<MetricEvent>,
1069) -> Vec<BoxFuture<'static, anyhow::Result<()>>> {
1070    (0..users)
1071        .map(move |_| {
1072            let invite_code = invite_code.clone();
1073            let event_sender = event_sender.clone();
1074            let endpoints = endpoints.clone();
1075            let f: BoxFuture<_> = Box::pin(async move {
1076                let m = fedimint_core::time::now();
1077                let _ = fedimint_api_client::api::net::ConnectorType::default()
1078                    .download_from_invite_code(&endpoints, &invite_code)
1079                    .await?;
1080                event_sender.send(MetricEvent {
1081                    name: "download_client_config".into(),
1082                    duration: m.elapsed()?,
1083                })?;
1084                Ok(())
1085            });
1086            f
1087        })
1088        .collect()
1089}
1090
1091async fn test_connect_raw_client(
1092    endpoints: &ConnectorRegistry,
1093    invite_code: InviteCode,
1094    users: u16,
1095    duration: Duration,
1096    timeout: Duration,
1097    limit_endpoints: Option<usize>,
1098    event_sender: mpsc::UnboundedSender<MetricEvent>,
1099) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
1100    use jsonrpsee_core::client::ClientT;
1101    use jsonrpsee_ws_client::WsClientBuilder;
1102
1103    let (mut cfg, _) = fedimint_api_client::api::net::ConnectorType::default()
1104        .download_from_invite_code(endpoints, &invite_code)
1105        .await?;
1106
1107    if let Some(limit_endpoints) = limit_endpoints {
1108        cfg.global.api_endpoints = cfg
1109            .global
1110            .api_endpoints
1111            .into_iter()
1112            .take(limit_endpoints)
1113            .collect();
1114        info!("Limiting endpoints to {:?}", cfg.global.api_endpoints);
1115    }
1116
1117    info!("Connecting to {users} clients");
1118    let clients = (0..users)
1119        .flat_map(|_| {
1120            cfg.global.api_endpoints.values().map(|url| async {
1121                let ws_client = WsClientBuilder::default()
1122                    .request_timeout(timeout)
1123                    .connection_timeout(timeout)
1124                    .build(url_to_string_with_default_port(&url.url))
1125                    .await?;
1126                Ok::<_, anyhow::Error>(ws_client)
1127            })
1128        })
1129        .collect::<Vec<_>>();
1130    let clients = futures::future::try_join_all(clients).await?;
1131    info!("Keeping {users} clients connected for {duration:?}");
1132    Ok(clients
1133        .into_iter()
1134        .map(|client| {
1135            let event_sender = event_sender.clone();
1136            let f: BoxFuture<_> = Box::pin(async move {
1137                let initial_time = fedimint_core::time::now();
1138                while initial_time.elapsed()? < duration {
1139                    let m = fedimint_core::time::now();
1140                    let _epoch: u64 = client
1141                        .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()])
1142                        .await?;
1143                    event_sender.send(MetricEvent {
1144                        name: SESSION_COUNT_ENDPOINT.into(),
1145                        duration: m.elapsed()?,
1146                    })?;
1147                    fedimint_core::task::sleep(Duration::from_secs(1)).await;
1148                }
1149                Ok(())
1150            });
1151            f
1152        })
1153        .collect())
1154}
1155
1156fn url_to_string_with_default_port(url: &SafeUrl) -> String {
1157    format!(
1158        "{}://{}:{}{}",
1159        url.scheme(),
1160        url.host().expect("Asserted on construction"),
1161        url.port_or_known_default()
1162            .expect("Asserted on construction"),
1163        url.path()
1164    )
1165}
1166
1167async fn handle_metrics_summary(
1168    opts: Opts,
1169    mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>,
1170) -> anyhow::Result<()> {
1171    let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs();
1172    let mut metrics_json_output_files = vec![];
1173    let mut previous_metrics = vec![];
1174    let mut comparison_output = None;
1175    if let Some(archive_dir) = opts.archive_dir {
1176        let mut archive_metrics = archive_dir.join("metrics");
1177        archive_metrics.push(opts.users.to_string());
1178        tokio::fs::create_dir_all(&archive_metrics).await?;
1179        let mut archive_comparisons = archive_dir.join("comparisons");
1180        archive_comparisons.push(opts.users.to_string());
1181        tokio::fs::create_dir_all(&archive_comparisons).await?;
1182
1183        let latest_metrics_file = std::fs::read_dir(&archive_metrics)?
1184            .map(|entry| {
1185                let entry = entry.unwrap();
1186                let metadata = entry.metadata().unwrap();
1187                let created = metadata
1188                    .created()
1189                    .unwrap_or_else(|_| metadata.modified().unwrap());
1190                (entry, created)
1191            })
1192            .max_by_key(|(_entry, created)| created.to_owned())
1193            .map(|(entry, _)| entry.path());
1194        if let Some(latest_metrics_file) = latest_metrics_file {
1195            let display_path = latest_metrics_file.display();
1196            let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file)
1197                .await
1198                .with_context(|| format!("Failed to open {display_path}"))?;
1199            let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines();
1200            while let Some(line) = lines.next_line().await? {
1201                match serde_json::from_str::<EventMetricSummary>(&line) {
1202                    Ok(metric) => {
1203                        previous_metrics.push(metric);
1204                    }
1205                    Err(e) => {
1206                        warn!("Failed to parse previous metric: {e:?}");
1207                    }
1208                }
1209            }
1210        }
1211        let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",));
1212        let new_metric_output = BufWriter::new(
1213            OpenOptions::new()
1214                .write(true)
1215                .create(true)
1216                .truncate(true)
1217                .open(new_metric_output)
1218                .await?,
1219        );
1220        metrics_json_output_files.push(new_metric_output);
1221        if !previous_metrics.is_empty() {
1222            let new_comparison_output =
1223                archive_comparisons.join(format!("{timestamp_seconds}.json",));
1224            comparison_output = Some(BufWriter::new(
1225                OpenOptions::new()
1226                    .write(true)
1227                    .create(true)
1228                    .truncate(true)
1229                    .open(new_comparison_output)
1230                    .await?,
1231            ));
1232        }
1233    }
1234    if let Some(metrics_json_output) = opts.metrics_json_output {
1235        metrics_json_output_files.push(BufWriter::new(
1236            tokio::fs::OpenOptions::new()
1237                .write(true)
1238                .create(true)
1239                .truncate(true)
1240                .open(metrics_json_output)
1241                .await?,
1242        ));
1243    }
1244    let mut results = BTreeMap::new();
1245    while let Some(event) = event_receiver.recv().await {
1246        let entry = results.entry(event.name).or_insert_with(Vec::new);
1247        entry.push(event.duration);
1248    }
1249    let mut previous_metrics = previous_metrics
1250        .into_iter()
1251        .map(|metric| (metric.name.clone(), metric))
1252        .collect::<HashMap<_, _>>();
1253    for (k, mut v) in results {
1254        v.sort();
1255        let n = v.len();
1256        let max = v.iter().last().unwrap();
1257        let min = v.first().unwrap();
1258        let median = v[n / 2];
1259        let sum: Duration = v.iter().sum();
1260        let avg = sum / n as u32;
1261        let metric_summary = EventMetricSummary {
1262            name: k.clone(),
1263            users: u64::from(opts.users),
1264            n: n as u64,
1265            avg_ms: avg.as_millis(),
1266            median_ms: median.as_millis(),
1267            max_ms: max.as_millis(),
1268            min_ms: min.as_millis(),
1269            timestamp_seconds,
1270        };
1271        let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) {
1272            if previous_metric.n == metric_summary.n {
1273                fn calculate_gain(current: u128, previous: u128) -> f64 {
1274                    current as f64 / previous as f64
1275                }
1276                let comparison = EventMetricComparison {
1277                    avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms),
1278                    median_ms_gain: calculate_gain(
1279                        metric_summary.median_ms,
1280                        previous_metric.median_ms,
1281                    ),
1282                    max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms),
1283                    min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms),
1284                    current: metric_summary.clone(),
1285                    previous: previous_metric,
1286                };
1287                if let Some(comparison_output) = &mut comparison_output {
1288                    let comparison_json =
1289                        serde_json::to_string(&comparison).expect("to be serializable");
1290                    comparison_output
1291                        .write_all(format!("{comparison_json}\n").as_bytes())
1292                        .await
1293                        .expect("to write on file");
1294                }
1295                Some(comparison)
1296            } else {
1297                info!(
1298                    "Skipping comparison for {k} because previous metric has different n ({} vs {})",
1299                    previous_metric.n, metric_summary.n
1300                );
1301                None
1302            }
1303        } else {
1304            None
1305        };
1306        if let Some(comparison) = comparison {
1307            println!(
1308                "{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"
1309            );
1310        } else {
1311            println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}");
1312        }
1313        let metric_summary_json =
1314            serde_json::to_string(&metric_summary).expect("to be serializable");
1315        for metrics_json_output_file in &mut metrics_json_output_files {
1316            metrics_json_output_file
1317                .write_all(format!("{metric_summary_json}\n").as_bytes())
1318                .await
1319                .expect("to write on file");
1320        }
1321    }
1322    for mut output in metrics_json_output_files {
1323        output.flush().await?;
1324    }
1325    if let Some(mut output) = comparison_output {
1326        output.flush().await?;
1327    }
1328    Ok(())
1329}
1330
1331async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> {
1332    let gateway_json = match generate_invoice_with {
1333        LnInvoiceGeneration::LdkLightningCli => {
1334            // If we are paying a lnd invoice, we use the LDK node
1335            cmd!(GatewayLndCli, "info").out_json().await
1336        }
1337    }?;
1338    let gateway_id = gateway_json["gateway_id"]
1339        .as_str()
1340        .context("Missing gateway_id field")?;
1341
1342    Ok(gateway_id.into())
1343}