fedimint_load_test_tool/
main.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_precision_loss)]
4#![allow(clippy::missing_errors_doc)]
5#![allow(clippy::ref_option)]
6#![allow(clippy::too_many_lines)]
7#![allow(clippy::large_futures)]
8
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::time::Duration;
13use std::vec;
14
15use anyhow::{Context, bail};
16use clap::{Args, Parser, Subcommand, ValueEnum};
17use common::{
18    gateway_pay_invoice, get_note_summary, ldk_create_invoice, ldk_pay_invoice,
19    ldk_wait_invoice_payment, parse_gateway_id, reissue_notes,
20};
21use devimint::cmd;
22use devimint::util::GatewayLndCli;
23use fedimint_client::ClientHandleArc;
24use fedimint_core::Amount;
25use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
26use fedimint_core::invite_code::InviteCode;
27use fedimint_core::module::ApiRequestErased;
28use fedimint_core::runtime::spawn;
29use fedimint_core::util::{BoxFuture, SafeUrl};
30use fedimint_ln_client::{LightningClientModule, LnReceiveState};
31use fedimint_ln_common::LightningGateway;
32use fedimint_mint_client::OOBNotes;
33use futures::StreamExt;
34use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
35use serde::{Deserialize, Serialize};
36use tokio::fs::OpenOptions;
37use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
38use tokio::sync::mpsc;
39use tracing::{debug, info, warn};
40
41use crate::common::{
42    build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli,
43};
44pub mod common;
45
46#[derive(Parser, Clone)]
47#[command(version)]
48struct Opts {
49    #[arg(
50        long,
51        default_value = "10",
52        help = "Number of users. Each user will work in parallel"
53    )]
54    users: u16,
55
56    #[arg(long, help = "Output with the metrics results in JSON format")]
57    metrics_json_output: Option<PathBuf>,
58
59    #[arg(
60        long,
61        help = "If given, will be used to store and retrieve past metrics for comparison purposes"
62    )]
63    archive_dir: Option<PathBuf>,
64
65    #[clap(subcommand)]
66    command: Command,
67}
68
69#[derive(Debug, Clone, Copy, ValueEnum)]
70enum LnInvoiceGeneration {
71    LdkLightningCli,
72}
73
74#[derive(Subcommand, Clone)]
75enum Command {
76    #[command(about = "Keep many websocket connections to a federation for a duration of time")]
77    TestConnect {
78        #[arg(long, help = "Federation invite code")]
79        invite_code: String,
80        #[arg(
81            long,
82            default_value = "60",
83            help = "How much time to keep the connections open, in seconds"
84        )]
85        duration_secs: u64,
86        #[arg(
87            long,
88            default_value = "120",
89            help = "Timeout for connection attempt and for each request, in secnods"
90        )]
91        timeout_secs: u64,
92        #[arg(
93            long,
94            help = "If given, will limit the number of endpoints (guardians) to connect to"
95        )]
96        limit_endpoints: Option<usize>,
97    },
98    #[command(about = "Try to download the client config many times.")]
99    TestDownload {
100        #[arg(long, help = "Federation invite code")]
101        invite_code: String,
102    },
103    #[command(
104        about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway"
105    )]
106    LoadTest(LoadTestArgs),
107    /// Run a load test where many users in parallel will receive then send a
108    /// payment through lightning.
109    /// It's 'circular' because the funds always come back to the same user then
110    /// we can keep making the payments in a loop
111    #[command()]
112    LnCircularLoadTest(LnCircularLoadTestArgs),
113}
114
115#[derive(Args, Clone)]
116struct LoadTestArgs {
117    #[arg(
118        long,
119        help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
120    )]
121    invite_code: Option<InviteCode>,
122
123    #[arg(
124        long,
125        help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
126    )]
127    initial_notes: Option<OOBNotes>,
128
129    #[arg(
130        long,
131        help = "Gateway Id. If none, retrieve one according to --generate-invoice-with"
132    )]
133    gateway_id: Option<String>,
134
135    #[arg(
136        long,
137        help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)"
138    )]
139    generate_invoice_with: Option<LnInvoiceGeneration>,
140
141    #[arg(
142        long,
143        default_value = "1",
144        help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided"
145    )]
146    invoices_per_user: u16,
147
148    #[arg(
149        long,
150        default_value = "0",
151        help = "How many seconds to sleep between LN payments"
152    )]
153    ln_payment_sleep_secs: u64,
154
155    #[arg(
156        long,
157        help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid"
158    )]
159    invoices_file: Option<PathBuf>,
160
161    #[arg(
162        long,
163        help = "How many notes to distribute to each user",
164        default_value = "2"
165    )]
166    notes_per_user: u16,
167
168    #[arg(
169        long,
170        help = "Note denomination to use for the test",
171        default_value = "1024"
172    )]
173    note_denomination: Amount,
174
175    #[arg(
176        long,
177        help = "Invoice amount when generating one",
178        default_value = "1000"
179    )]
180    invoice_amount: Amount,
181}
182
183#[derive(Args, Clone)]
184struct LnCircularLoadTestArgs {
185    #[arg(
186        long,
187        help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
188    )]
189    invite_code: Option<InviteCode>,
190
191    #[arg(
192        long,
193        help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
194    )]
195    initial_notes: Option<OOBNotes>,
196
197    #[arg(
198        long,
199        default_value = "60",
200        help = "For how many seconds to run the test"
201    )]
202    test_duration_secs: u64,
203
204    #[arg(
205        long,
206        default_value = "0",
207        help = "How many seconds to sleep between LN payments"
208    )]
209    ln_payment_sleep_secs: u64,
210
211    #[arg(
212        long,
213        help = "How many notes to distribute to each user",
214        default_value = "1"
215    )]
216    notes_per_user: u16,
217
218    #[arg(
219        long,
220        help = "Note denomination to use for the test",
221        default_value = "1024"
222    )]
223    note_denomination: Amount,
224
225    #[arg(
226        long,
227        help = "Invoice amount when generating one",
228        default_value = "1000"
229    )]
230    invoice_amount: Amount,
231
232    #[arg(long)]
233    strategy: LnCircularStrategy,
234}
235
236#[derive(Debug, Clone, Copy, ValueEnum)]
237enum LnCircularStrategy {
238    /// The user will pay its own invoice
239    SelfPayment,
240    /// One gateway will pay/receive to/from the other, then they will swap
241    /// places
242    TwoGateways,
243    /// Two clients will pay to each other using the same gateway
244    PartnerPingPong,
245}
246
247#[derive(Debug, Clone)]
248pub struct MetricEvent {
249    name: String,
250    duration: Duration,
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
254struct EventMetricSummary {
255    name: String,
256    users: u64,
257    n: u64,
258    avg_ms: u128,
259    median_ms: u128,
260    max_ms: u128,
261    min_ms: u128,
262    timestamp_seconds: u64,
263}
264
265#[derive(Debug, Serialize, Deserialize)]
266struct EventMetricComparison {
267    avg_ms_gain: f64,
268    median_ms_gain: f64,
269    max_ms_gain: f64,
270    min_ms_gain: f64,
271    current: EventMetricSummary,
272    previous: EventMetricSummary,
273}
274
275impl std::fmt::Display for EventMetricComparison {
276    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277        fn to_percent(gain: f64) -> String {
278            if gain >= 1.0 {
279                format!("+{:.2}%", (gain - 1.0) * 100.0)
280            } else {
281                format!("-{:.2}%", (1.0 - gain) * 100.0)
282            }
283        }
284        f.write_str(&format!(
285            "avg: {}, median: {}, max: {}, min: {}",
286            to_percent(self.avg_ms_gain),
287            to_percent(self.median_ms_gain),
288            to_percent(self.max_ms_gain),
289            to_percent(self.min_ms_gain),
290        ))
291    }
292}
293
294#[tokio::main]
295async fn main() -> anyhow::Result<()> {
296    fedimint_logging::TracingSetup::default().init()?;
297    let opts = Opts::parse();
298    let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
299    let summary_handle = spawn("handle metrics summary", {
300        let opts = opts.clone();
301        async { handle_metrics_summary(opts, event_receiver).await }
302    });
303    let futures = match opts.command.clone() {
304        Command::TestConnect {
305            invite_code,
306            duration_secs,
307            timeout_secs,
308            limit_endpoints,
309        } => {
310            let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
311            test_connect_raw_client(
312                invite_code,
313                opts.users,
314                Duration::from_secs(duration_secs),
315                Duration::from_secs(timeout_secs),
316                limit_endpoints,
317                event_sender.clone(),
318            )
319            .await?
320        }
321        Command::TestDownload { invite_code } => {
322            let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
323            test_download_config(&invite_code, opts.users, &event_sender.clone())
324        }
325        Command::LoadTest(args) => {
326            let invite_code = invite_code_or_fallback(args.invite_code).await;
327
328            let gateway_id = if let Some(gateway_id) = args.gateway_id {
329                Some(gateway_id)
330            } else if let Some(generate_invoice_with) = args.generate_invoice_with {
331                Some(get_gateway_id(generate_invoice_with).await?)
332            } else {
333                None
334            };
335            let invoices = if let Some(invoices_file) = args.invoices_file {
336                let display_path = invoices_file.display();
337                let invoices_file = tokio::fs::File::open(&invoices_file)
338                    .await
339                    .with_context(|| format!("Failed to open {display_path}"))?;
340                let mut lines = tokio::io::BufReader::new(invoices_file).lines();
341                let mut invoices = vec![];
342                while let Some(line) = lines.next_line().await? {
343                    let invoice = Bolt11Invoice::from_str(&line)?;
344                    invoices.push(invoice);
345                }
346                invoices
347            } else {
348                vec![]
349            };
350            if args.generate_invoice_with.is_none() && invoices.is_empty() {
351                info!(
352                    "No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run"
353                );
354            }
355            run_load_test(
356                opts.archive_dir,
357                opts.users,
358                invite_code,
359                args.initial_notes,
360                args.generate_invoice_with,
361                args.invoices_per_user,
362                Duration::from_secs(args.ln_payment_sleep_secs),
363                invoices,
364                gateway_id,
365                args.notes_per_user,
366                args.note_denomination,
367                args.invoice_amount,
368                event_sender.clone(),
369            )
370            .await?
371        }
372        Command::LnCircularLoadTest(args) => {
373            let invite_code = invite_code_or_fallback(args.invite_code).await;
374            run_ln_circular_load_test(
375                opts.archive_dir,
376                opts.users,
377                invite_code,
378                args.initial_notes,
379                Duration::from_secs(args.test_duration_secs),
380                Duration::from_secs(args.ln_payment_sleep_secs),
381                args.notes_per_user,
382                args.note_denomination,
383                args.invoice_amount,
384                args.strategy,
385                event_sender.clone(),
386            )
387            .await?
388        }
389    };
390
391    let result = futures::future::join_all(futures).await;
392    drop(event_sender);
393    summary_handle.await??;
394    let len_failures = result.iter().filter(|r| r.is_err()).count();
395    eprintln!("{} results, {len_failures} failures", result.len());
396    for r in result {
397        if let Err(e) = r {
398            warn!("Task failed: {:?}", e);
399        }
400    }
401    if len_failures > 0 {
402        bail!("Finished with failures");
403    }
404    info!("Finished successfully");
405    Ok(())
406}
407
408async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> {
409    if let Some(invite_code) = invite_code {
410        Some(invite_code)
411    } else {
412        // Try to get an invite code through cli in a best effort basis
413        match get_invite_code_cli(0.into()).await {
414            Ok(invite_code) => Some(invite_code),
415            Err(e) => {
416                info!(
417                    "No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."
418                );
419                None
420            }
421        }
422    }
423}
424
425#[allow(clippy::too_many_arguments)]
426async fn run_load_test(
427    archive_dir: Option<PathBuf>,
428    users: u16,
429    invite_code: Option<InviteCode>,
430    initial_notes: Option<OOBNotes>,
431    generate_invoice_with: Option<LnInvoiceGeneration>,
432    generated_invoices_per_user: u16,
433    ln_payment_sleep: Duration,
434    invoices_from_file: Vec<Bolt11Invoice>,
435    gateway_id: Option<String>,
436    notes_per_user: u16,
437    note_denomination: Amount,
438    invoice_amount: Amount,
439    event_sender: mpsc::UnboundedSender<MetricEvent>,
440) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
441    let db_path = get_db_path(&archive_dir);
442    let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
443    let minimum_notes = notes_per_user * users;
444    let minimum_amount_required = note_denomination * u64::from(minimum_notes);
445
446    reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
447    get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
448    print_coordinator_notes(&coordinator).await?;
449    info!(
450        "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
451    );
452    remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
453    print_coordinator_notes(&coordinator).await?;
454
455    let users_clients = get_users_clients(users, db_path, invite_code).await?;
456
457    let mut users_notes =
458        get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
459    let mut users_invoices = HashMap::new();
460    let mut user = 0;
461    // Distribute invoices to users in a round robin fashion
462    for invoice in invoices_from_file {
463        users_invoices
464            .entry(user)
465            .or_insert_with(Vec::new)
466            .push(invoice);
467        user = (user + 1) % users;
468    }
469
470    info!("Starting user tasks");
471    let futures = users_clients
472        .into_iter()
473        .enumerate()
474        .map(|(u, client)| {
475            let u = u as u16;
476            let oob_notes = users_notes.remove(&u).unwrap();
477            let invoices = users_invoices.remove(&u).unwrap_or_default();
478            let event_sender = event_sender.clone();
479            let f: BoxFuture<_> = Box::pin(do_load_test_user_task(
480                format!("User {u}:"),
481                client,
482                oob_notes,
483                generated_invoices_per_user,
484                ln_payment_sleep,
485                invoice_amount,
486                invoices,
487                generate_invoice_with,
488                event_sender,
489                gateway_id.clone(),
490            ));
491            f
492        })
493        .collect::<Vec<_>>();
494
495    Ok(futures)
496}
497
498async fn get_notes_for_users(
499    users: u16,
500    notes_per_user: u16,
501    coordinator: ClientHandleArc,
502    note_denomination: Amount,
503) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> {
504    let mut users_notes = HashMap::new();
505    for u in 0..users {
506        users_notes.insert(u, Vec::with_capacity(notes_per_user.into()));
507        for _ in 0..notes_per_user {
508            let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?;
509            let user_amount = oob_notes.total_amount();
510            info!("Giving {user_amount} to user {u}");
511            users_notes.get_mut(&u).unwrap().push(oob_notes);
512        }
513    }
514    Ok(users_notes)
515}
516
517async fn get_users_clients(
518    n: u16,
519    db_path: Option<PathBuf>,
520    invite_code: Option<InviteCode>,
521) -> anyhow::Result<Vec<ClientHandleArc>> {
522    let mut users_clients = Vec::with_capacity(n.into());
523    for u in 0..n {
524        let (client, _) = get_user_client(u, &db_path, &invite_code).await?;
525        users_clients.push(client);
526    }
527    Ok(users_clients)
528}
529
530async fn get_user_client(
531    user_index: u16,
532    db_path: &Option<PathBuf>,
533    invite_code: &Option<InviteCode>,
534) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
535    let user_db = db_path
536        .as_ref()
537        .map(|db_path| db_path.join(format!("user_{user_index}.db")));
538    let user_invite_code = if user_db.as_ref().is_some_and(|db| db.exists()) {
539        None
540    } else {
541        invite_code.clone()
542    };
543    let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?;
544    // if lightning module is present, update the gateway cache
545    if let Ok(ln_client) = client.get_first_module::<LightningClientModule>() {
546        let _ = ln_client.update_gateway_cache().await;
547    }
548    Ok((client, invite_code))
549}
550
551async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> {
552    info!("Note summary:");
553    let summary = get_note_summary(coordinator).await?;
554    for (k, v) in summary.iter() {
555        info!("{k}: {v}");
556    }
557    Ok(())
558}
559
560async fn get_required_notes(
561    coordinator: &ClientHandleArc,
562    minimum_amount_required: Amount,
563    event_sender: &mpsc::UnboundedSender<MetricEvent>,
564) -> anyhow::Result<()> {
565    let current_balance = coordinator.get_balance().await;
566    if current_balance < minimum_amount_required {
567        let diff = minimum_amount_required.saturating_sub(current_balance);
568        info!(
569            "Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"
570        );
571        match try_get_notes_cli(&diff, 5).await {
572            Ok(notes) => {
573                info!("Got {} more notes, reissuing them", notes.total_amount());
574                reissue_notes(coordinator, notes, event_sender).await?;
575            }
576            Err(e) => {
577                info!("Unable to get more notes: '{e}', will try to proceed without them");
578            }
579        }
580    } else {
581        info!(
582            "Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"
583        );
584    }
585    Ok(())
586}
587
588async fn reissue_initial_notes(
589    initial_notes: Option<OOBNotes>,
590    coordinator: &ClientHandleArc,
591    event_sender: &mpsc::UnboundedSender<MetricEvent>,
592) -> anyhow::Result<()> {
593    if let Some(notes) = initial_notes {
594        let amount = notes.total_amount();
595        info!("Reissuing initial notes, got {amount}");
596        reissue_notes(coordinator, notes, event_sender).await?;
597    }
598    Ok(())
599}
600
601async fn get_coordinator_client(
602    db_path: &Option<PathBuf>,
603    invite_code: &Option<InviteCode>,
604) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
605    let (client, invite_code) = if let Some(db_path) = db_path {
606        let coordinator_db = db_path.join("coordinator.db");
607        if coordinator_db.exists() {
608            build_client(invite_code.clone(), Some(&coordinator_db)).await?
609        } else {
610            tokio::fs::create_dir_all(db_path).await?;
611            build_client(
612                Some(invite_code.clone().context(
613                    "Running on this archive dir for the first time, an invite code is required",
614                )?),
615                Some(&coordinator_db),
616            )
617            .await?
618        }
619    } else {
620        build_client(
621            Some(
622                invite_code
623                    .clone()
624                    .context("No archive dir given, an invite code is strictly required")?,
625            ),
626            None,
627        )
628        .await?
629    };
630    Ok((client, invite_code))
631}
632
633fn get_db_path(archive_dir: &Option<PathBuf>) -> Option<PathBuf> {
634    archive_dir.as_ref().map(|p| p.join("db"))
635}
636
637async fn get_lightning_gateway(
638    client: &ClientHandleArc,
639    gateway_id: Option<String>,
640) -> Option<LightningGateway> {
641    let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id");
642    let ln_module = client
643        .get_first_module::<LightningClientModule>()
644        .expect("Must have ln client module");
645    ln_module.select_gateway(&gateway_id).await
646}
647
648#[allow(clippy::too_many_arguments)]
649async fn do_load_test_user_task(
650    prefix: String,
651    client: ClientHandleArc,
652    oob_notes: Vec<OOBNotes>,
653    generated_invoices_per_user: u16,
654    ln_payment_sleep: Duration,
655    invoice_amount: Amount,
656    additional_invoices: Vec<Bolt11Invoice>,
657    generate_invoice_with: Option<LnInvoiceGeneration>,
658    event_sender: mpsc::UnboundedSender<MetricEvent>,
659    gateway_id: Option<String>,
660) -> anyhow::Result<()> {
661    let ln_gateway = get_lightning_gateway(&client, gateway_id).await;
662    for oob_note in oob_notes {
663        let amount = oob_note.total_amount();
664        reissue_notes(&client, oob_note, &event_sender)
665            .await
666            .map_err(|e| anyhow::anyhow!("while reissuing initial {amount}: {e}"))?;
667    }
668    let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable();
669    while let Some(_) = generated_invoices_per_user_iterator.next() {
670        let total_amount = get_note_summary(&client).await?.total_amount();
671        if invoice_amount > total_amount {
672            warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
673        } else {
674            match generate_invoice_with {
675                Some(LnInvoiceGeneration::LdkLightningCli) => {
676                    let invoice = ldk_create_invoice(invoice_amount).await?;
677                    gateway_pay_invoice(
678                        &prefix,
679                        "LND",
680                        &client,
681                        invoice.clone(),
682                        &event_sender,
683                        ln_gateway.clone(),
684                    )
685                    .await?;
686                    ldk_wait_invoice_payment(&invoice).await?;
687                }
688                None if additional_invoices.is_empty() => {
689                    debug!(
690                        "No method given to generate an invoice and no invoices on file, will not test the gateway"
691                    );
692                    break;
693                }
694                None => {
695                    break;
696                }
697            }
698            if generated_invoices_per_user_iterator.peek().is_some() {
699                // Only sleep while there are more invoices to pay
700                fedimint_core::task::sleep(ln_payment_sleep).await;
701            }
702        }
703    }
704    let mut additional_invoices = additional_invoices.into_iter().peekable();
705    while let Some(invoice) = additional_invoices.next() {
706        let total_amount = get_note_summary(&client).await?.total_amount();
707        let invoice_amount =
708            Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default());
709        if invoice_amount > total_amount {
710            warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
711        } else if invoice_amount == Amount::ZERO {
712            warn!("Can't pay invoice {invoice}, amount is zero");
713        } else {
714            gateway_pay_invoice(
715                &prefix,
716                "unknown",
717                &client,
718                invoice,
719                &event_sender,
720                ln_gateway.clone(),
721            )
722            .await?;
723            if additional_invoices.peek().is_some() {
724                // Only sleep while there are more invoices to pay
725                fedimint_core::task::sleep(ln_payment_sleep).await;
726            }
727        }
728    }
729    Ok(())
730}
731
732#[allow(clippy::too_many_arguments)]
733async fn run_ln_circular_load_test(
734    archive_dir: Option<PathBuf>,
735    users: u16,
736    invite_code: Option<InviteCode>,
737    initial_notes: Option<OOBNotes>,
738    test_duration: Duration,
739    ln_payment_sleep: Duration,
740    notes_per_user: u16,
741    note_denomination: Amount,
742    invoice_amount: Amount,
743    strategy: LnCircularStrategy,
744    event_sender: mpsc::UnboundedSender<MetricEvent>,
745) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
746    let db_path = get_db_path(&archive_dir);
747    let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
748    let minimum_notes = notes_per_user * users;
749    let minimum_amount_required = note_denomination * u64::from(minimum_notes);
750
751    reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
752    get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
753
754    info!(
755        "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
756    );
757    remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
758
759    print_coordinator_notes(&coordinator).await?;
760
761    let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?;
762
763    let mut users_notes =
764        get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
765
766    info!("Starting user tasks");
767    let futures = users_clients
768        .into_iter()
769        .enumerate()
770        .map(|(u, client)| {
771            let u = u as u16;
772            let oob_notes = users_notes.remove(&u).unwrap();
773            let event_sender = event_sender.clone();
774            let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task(
775                format!("User {u}:"),
776                client,
777                invite_code.clone(),
778                oob_notes,
779                test_duration,
780                ln_payment_sleep,
781                invoice_amount,
782                strategy,
783                event_sender,
784            ));
785            f
786        })
787        .collect::<Vec<_>>();
788
789    Ok(futures)
790}
791
792#[allow(clippy::too_many_arguments)]
793async fn do_ln_circular_test_user_task(
794    prefix: String,
795    client: ClientHandleArc,
796    invite_code: Option<InviteCode>,
797    oob_notes: Vec<OOBNotes>,
798    test_duration: Duration,
799    ln_payment_sleep: Duration,
800    invoice_amount: Amount,
801    strategy: LnCircularStrategy,
802    event_sender: mpsc::UnboundedSender<MetricEvent>,
803) -> anyhow::Result<()> {
804    for oob_note in oob_notes {
805        let amount = oob_note.total_amount();
806        reissue_notes(&client, oob_note, &event_sender)
807            .await
808            .map_err(|e| anyhow::anyhow!("while reissuing initial {amount}: {e}"))?;
809    }
810    let initial_time = fedimint_core::time::now();
811    let still_ontime = || async {
812        fedimint_core::time::now()
813            .duration_since(initial_time)
814            .expect("time to work")
815            <= test_duration
816    };
817    let sleep_a_bit = || async {
818        if still_ontime().await {
819            fedimint_core::task::sleep(ln_payment_sleep).await;
820        }
821    };
822    match strategy {
823        LnCircularStrategy::TwoGateways => {
824            let invoice_generation = LnInvoiceGeneration::LdkLightningCli;
825            while still_ontime().await {
826                let gateway_id = get_gateway_id(invoice_generation).await?;
827                let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await;
828                run_two_gateways_strategy(
829                    &prefix,
830                    &invoice_generation,
831                    &invoice_amount,
832                    &event_sender,
833                    &client,
834                    ln_gateway,
835                )
836                .await?;
837                sleep_a_bit().await;
838            }
839        }
840        LnCircularStrategy::SelfPayment => {
841            while still_ontime().await {
842                do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?;
843                sleep_a_bit().await;
844            }
845        }
846        LnCircularStrategy::PartnerPingPong => {
847            let (partner, _) = build_client(invite_code, None).await?;
848            while still_ontime().await {
849                do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender)
850                    .await?;
851                sleep_a_bit().await;
852            }
853        }
854    }
855    Ok(())
856}
857
858const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice";
859
860async fn run_two_gateways_strategy(
861    prefix: &str,
862    invoice_generation: &LnInvoiceGeneration,
863    invoice_amount: &Amount,
864    event_sender: &mpsc::UnboundedSender<MetricEvent>,
865    client: &ClientHandleArc,
866    ln_gateway: Option<LightningGateway>,
867) -> Result<(), anyhow::Error> {
868    let create_invoice_time = fedimint_core::time::now();
869    match *invoice_generation {
870        LnInvoiceGeneration::LdkLightningCli => {
871            let invoice = ldk_create_invoice(*invoice_amount).await?;
872            let elapsed = create_invoice_time.elapsed()?;
873            info!("Created invoice using LDK in {elapsed:?}");
874            event_sender.send(MetricEvent {
875                name: GATEWAY_CREATE_INVOICE.into(),
876                duration: elapsed,
877            })?;
878            gateway_pay_invoice(
879                prefix,
880                "LND",
881                client,
882                invoice.clone(),
883                event_sender,
884                ln_gateway.clone(),
885            )
886            .await?;
887            ldk_wait_invoice_payment(&invoice).await?;
888            let (operation_id, invoice) =
889                client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?;
890            let pay_invoice_time = fedimint_core::time::now();
891            ldk_pay_invoice(invoice).await?;
892            wait_invoice_payment(
893                prefix,
894                "LND",
895                client,
896                operation_id,
897                event_sender,
898                pay_invoice_time,
899            )
900            .await?;
901        }
902    }
903    Ok(())
904}
905
906async fn do_self_payment(
907    prefix: &str,
908    client: &ClientHandleArc,
909    invoice_amount: Amount,
910    event_sender: &mpsc::UnboundedSender<MetricEvent>,
911) -> anyhow::Result<()> {
912    let (operation_id, invoice) =
913        client_create_invoice(client, invoice_amount, event_sender, None).await?;
914    let pay_invoice_time = fedimint_core::time::now();
915    let lightning_module = client.get_first_module::<LightningClientModule>()?;
916    //let gateway = lightning_module.select_active_gateway_opt().await;
917    lightning_module
918        .pay_bolt11_invoice(None, invoice, ())
919        .await?;
920    wait_invoice_payment(
921        prefix,
922        "gateway",
923        client,
924        operation_id,
925        event_sender,
926        pay_invoice_time,
927    )
928    .await?;
929    Ok(())
930}
931
932async fn do_partner_ping_pong(
933    prefix: &str,
934    client: &ClientHandleArc,
935    partner: &ClientHandleArc,
936    invoice_amount: Amount,
937    event_sender: &mpsc::UnboundedSender<MetricEvent>,
938) -> anyhow::Result<()> {
939    // Ping (partner creates invoice, client pays)
940    let (operation_id, invoice) =
941        client_create_invoice(partner, invoice_amount, event_sender, None).await?;
942    let pay_invoice_time = fedimint_core::time::now();
943    let lightning_module = client.get_first_module::<LightningClientModule>()?;
944    // TODO: Select random gateway?
945    //let gateway = lightning_module.select_active_gateway_opt().await;
946    lightning_module
947        .pay_bolt11_invoice(None, invoice, ())
948        .await?;
949    wait_invoice_payment(
950        prefix,
951        "gateway",
952        partner,
953        operation_id,
954        event_sender,
955        pay_invoice_time,
956    )
957    .await?;
958    // Pong (client creates invoice, partner pays)
959    let (operation_id, invoice) =
960        client_create_invoice(client, invoice_amount, event_sender, None).await?;
961    let pay_invoice_time = fedimint_core::time::now();
962    let partner_lightning_module = partner.get_first_module::<LightningClientModule>()?;
963    //let gateway = partner_lightning_module.select_active_gateway_opt().await;
964    // TODO: Select random gateway?
965    partner_lightning_module
966        .pay_bolt11_invoice(None, invoice, ())
967        .await?;
968    wait_invoice_payment(
969        prefix,
970        "gateway",
971        client,
972        operation_id,
973        event_sender,
974        pay_invoice_time,
975    )
976    .await?;
977    Ok(())
978}
979
980async fn wait_invoice_payment(
981    prefix: &str,
982    gateway_name: &str,
983    client: &ClientHandleArc,
984    operation_id: fedimint_core::core::OperationId,
985    event_sender: &mpsc::UnboundedSender<MetricEvent>,
986    pay_invoice_time: std::time::SystemTime,
987) -> anyhow::Result<()> {
988    let elapsed = pay_invoice_time.elapsed()?;
989    info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}");
990    event_sender.send(MetricEvent {
991        name: format!("gateway_{gateway_name}_payment_received_started"),
992        duration: elapsed,
993    })?;
994    let lightning_module = client.get_first_module::<LightningClientModule>()?;
995    let mut updates = lightning_module
996        .subscribe_ln_receive(operation_id)
997        .await?
998        .into_stream();
999    while let Some(update) = updates.next().await {
1000        debug!(%prefix, ?update, "Invoice payment update");
1001        match update {
1002            LnReceiveState::Claimed => {
1003                let elapsed: Duration = pay_invoice_time.elapsed()?;
1004                info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}");
1005                event_sender.send(MetricEvent {
1006                    name: "gateway_payment_received_success".into(),
1007                    duration: elapsed,
1008                })?;
1009                event_sender.send(MetricEvent {
1010                    name: format!("gateway_{gateway_name}_payment_received_success"),
1011                    duration: elapsed,
1012                })?;
1013                break;
1014            }
1015            LnReceiveState::Canceled { reason } => {
1016                let elapsed: Duration = pay_invoice_time.elapsed()?;
1017                info!(
1018                    "{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"
1019                );
1020                event_sender.send(MetricEvent {
1021                    name: "gateway_payment_received_canceled".into(),
1022                    duration: elapsed,
1023                })?;
1024                break;
1025            }
1026            _ => {}
1027        }
1028    }
1029    Ok(())
1030}
1031
1032async fn client_create_invoice(
1033    client: &ClientHandleArc,
1034    invoice_amount: Amount,
1035    event_sender: &mpsc::UnboundedSender<MetricEvent>,
1036    ln_gateway: Option<LightningGateway>,
1037) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> {
1038    let create_invoice_time = fedimint_core::time::now();
1039    let lightning_module = client.get_first_module::<LightningClientModule>()?;
1040    let desc = Description::new("test".to_string())?;
1041    let (operation_id, invoice, _) = lightning_module
1042        .create_bolt11_invoice(
1043            invoice_amount,
1044            Bolt11InvoiceDescription::Direct(desc),
1045            None,
1046            (),
1047            ln_gateway,
1048        )
1049        .await?;
1050    let elapsed = create_invoice_time.elapsed()?;
1051    info!("Created invoice using gateway in {elapsed:?}");
1052    event_sender.send(MetricEvent {
1053        name: GATEWAY_CREATE_INVOICE.into(),
1054        duration: elapsed,
1055    })?;
1056    Ok((operation_id, invoice))
1057}
1058
1059fn test_download_config(
1060    invite_code: &InviteCode,
1061    users: u16,
1062    event_sender: &mpsc::UnboundedSender<MetricEvent>,
1063) -> Vec<BoxFuture<'static, anyhow::Result<()>>> {
1064    (0..users)
1065        .map(|_| {
1066            let invite_code = invite_code.clone();
1067            let event_sender = event_sender.clone();
1068            let f: BoxFuture<_> = Box::pin(async move {
1069                let m = fedimint_core::time::now();
1070                let _ = fedimint_api_client::api::net::Connector::default()
1071                    .download_from_invite_code(&invite_code)
1072                    .await?;
1073                event_sender.send(MetricEvent {
1074                    name: "download_client_config".into(),
1075                    duration: m.elapsed()?,
1076                })?;
1077                Ok(())
1078            });
1079            f
1080        })
1081        .collect()
1082}
1083
1084async fn test_connect_raw_client(
1085    invite_code: InviteCode,
1086    users: u16,
1087    duration: Duration,
1088    timeout: Duration,
1089    limit_endpoints: Option<usize>,
1090    event_sender: mpsc::UnboundedSender<MetricEvent>,
1091) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
1092    use jsonrpsee_core::client::ClientT;
1093    use jsonrpsee_ws_client::WsClientBuilder;
1094
1095    let mut cfg = fedimint_api_client::api::net::Connector::default()
1096        .download_from_invite_code(&invite_code)
1097        .await?;
1098
1099    if let Some(limit_endpoints) = limit_endpoints {
1100        cfg.global.api_endpoints = cfg
1101            .global
1102            .api_endpoints
1103            .into_iter()
1104            .take(limit_endpoints)
1105            .collect();
1106        info!("Limiting endpoints to {:?}", cfg.global.api_endpoints);
1107    }
1108
1109    info!("Connecting to {users} clients");
1110    let clients = (0..users)
1111        .flat_map(|_| {
1112            cfg.global.api_endpoints.values().map(|url| async {
1113                let ws_client = WsClientBuilder::default()
1114                    .request_timeout(timeout)
1115                    .connection_timeout(timeout)
1116                    .build(url_to_string_with_default_port(&url.url))
1117                    .await?;
1118                Ok::<_, anyhow::Error>(ws_client)
1119            })
1120        })
1121        .collect::<Vec<_>>();
1122    let clients = futures::future::try_join_all(clients).await?;
1123    info!("Keeping {users} clients connected for {duration:?}");
1124    Ok(clients
1125        .into_iter()
1126        .map(|client| {
1127            let event_sender = event_sender.clone();
1128            let f: BoxFuture<_> = Box::pin(async move {
1129                let initial_time = fedimint_core::time::now();
1130                while initial_time.elapsed()? < duration {
1131                    let m = fedimint_core::time::now();
1132                    let _epoch: u64 = client
1133                        .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()])
1134                        .await?;
1135                    event_sender.send(MetricEvent {
1136                        name: SESSION_COUNT_ENDPOINT.into(),
1137                        duration: m.elapsed()?,
1138                    })?;
1139                    fedimint_core::task::sleep(Duration::from_secs(1)).await;
1140                }
1141                Ok(())
1142            });
1143            f
1144        })
1145        .collect())
1146}
1147
1148fn url_to_string_with_default_port(url: &SafeUrl) -> String {
1149    format!(
1150        "{}://{}:{}{}",
1151        url.scheme(),
1152        url.host().expect("Asserted on construction"),
1153        url.port_or_known_default()
1154            .expect("Asserted on construction"),
1155        url.path()
1156    )
1157}
1158
1159async fn handle_metrics_summary(
1160    opts: Opts,
1161    mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>,
1162) -> anyhow::Result<()> {
1163    let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs();
1164    let mut metrics_json_output_files = vec![];
1165    let mut previous_metrics = vec![];
1166    let mut comparison_output = None;
1167    if let Some(archive_dir) = opts.archive_dir {
1168        let mut archive_metrics = archive_dir.join("metrics");
1169        archive_metrics.push(opts.users.to_string());
1170        tokio::fs::create_dir_all(&archive_metrics).await?;
1171        let mut archive_comparisons = archive_dir.join("comparisons");
1172        archive_comparisons.push(opts.users.to_string());
1173        tokio::fs::create_dir_all(&archive_comparisons).await?;
1174
1175        let latest_metrics_file = std::fs::read_dir(&archive_metrics)?
1176            .map(|entry| {
1177                let entry = entry.unwrap();
1178                let metadata = entry.metadata().unwrap();
1179                let created = metadata
1180                    .created()
1181                    .unwrap_or_else(|_| metadata.modified().unwrap());
1182                (entry, created)
1183            })
1184            .max_by_key(|(_entry, created)| created.to_owned())
1185            .map(|(entry, _)| entry.path());
1186        if let Some(latest_metrics_file) = latest_metrics_file {
1187            let display_path = latest_metrics_file.display();
1188            let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file)
1189                .await
1190                .with_context(|| format!("Failed to open {display_path}"))?;
1191            let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines();
1192            while let Some(line) = lines.next_line().await? {
1193                match serde_json::from_str::<EventMetricSummary>(&line) {
1194                    Ok(metric) => {
1195                        previous_metrics.push(metric);
1196                    }
1197                    Err(e) => {
1198                        warn!("Failed to parse previous metric: {e:?}");
1199                    }
1200                }
1201            }
1202        }
1203        let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",));
1204        let new_metric_output = BufWriter::new(
1205            OpenOptions::new()
1206                .write(true)
1207                .create(true)
1208                .truncate(true)
1209                .open(new_metric_output)
1210                .await?,
1211        );
1212        metrics_json_output_files.push(new_metric_output);
1213        if !previous_metrics.is_empty() {
1214            let new_comparison_output =
1215                archive_comparisons.join(format!("{timestamp_seconds}.json",));
1216            comparison_output = Some(BufWriter::new(
1217                OpenOptions::new()
1218                    .write(true)
1219                    .create(true)
1220                    .truncate(true)
1221                    .open(new_comparison_output)
1222                    .await?,
1223            ));
1224        }
1225    }
1226    if let Some(metrics_json_output) = opts.metrics_json_output {
1227        metrics_json_output_files.push(BufWriter::new(
1228            tokio::fs::OpenOptions::new()
1229                .write(true)
1230                .create(true)
1231                .truncate(true)
1232                .open(metrics_json_output)
1233                .await?,
1234        ));
1235    }
1236    let mut results = BTreeMap::new();
1237    while let Some(event) = event_receiver.recv().await {
1238        let entry = results.entry(event.name).or_insert_with(Vec::new);
1239        entry.push(event.duration);
1240    }
1241    let mut previous_metrics = previous_metrics
1242        .into_iter()
1243        .map(|metric| (metric.name.clone(), metric))
1244        .collect::<HashMap<_, _>>();
1245    for (k, mut v) in results {
1246        v.sort();
1247        let n = v.len();
1248        let max = v.iter().last().unwrap();
1249        let min = v.first().unwrap();
1250        let median = v[n / 2];
1251        let sum: Duration = v.iter().sum();
1252        let avg = sum / n as u32;
1253        let metric_summary = EventMetricSummary {
1254            name: k.clone(),
1255            users: u64::from(opts.users),
1256            n: n as u64,
1257            avg_ms: avg.as_millis(),
1258            median_ms: median.as_millis(),
1259            max_ms: max.as_millis(),
1260            min_ms: min.as_millis(),
1261            timestamp_seconds,
1262        };
1263        let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) {
1264            if previous_metric.n == metric_summary.n {
1265                fn calculate_gain(current: u128, previous: u128) -> f64 {
1266                    current as f64 / previous as f64
1267                }
1268                let comparison = EventMetricComparison {
1269                    avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms),
1270                    median_ms_gain: calculate_gain(
1271                        metric_summary.median_ms,
1272                        previous_metric.median_ms,
1273                    ),
1274                    max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms),
1275                    min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms),
1276                    current: metric_summary.clone(),
1277                    previous: previous_metric,
1278                };
1279                if let Some(comparison_output) = &mut comparison_output {
1280                    let comparison_json =
1281                        serde_json::to_string(&comparison).expect("to be serializable");
1282                    comparison_output
1283                        .write_all(format!("{comparison_json}\n").as_bytes())
1284                        .await
1285                        .expect("to write on file");
1286                }
1287                Some(comparison)
1288            } else {
1289                info!(
1290                    "Skipping comparison for {k} because previous metric has different n ({} vs {})",
1291                    previous_metric.n, metric_summary.n
1292                );
1293                None
1294            }
1295        } else {
1296            None
1297        };
1298        if let Some(comparison) = comparison {
1299            println!(
1300                "{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"
1301            );
1302        } else {
1303            println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}");
1304        }
1305        let metric_summary_json =
1306            serde_json::to_string(&metric_summary).expect("to be serializable");
1307        for metrics_json_output_file in &mut metrics_json_output_files {
1308            metrics_json_output_file
1309                .write_all(format!("{metric_summary_json}\n").as_bytes())
1310                .await
1311                .expect("to write on file");
1312        }
1313    }
1314    for mut output in metrics_json_output_files {
1315        output.flush().await?;
1316    }
1317    if let Some(mut output) = comparison_output {
1318        output.flush().await?;
1319    }
1320    Ok(())
1321}
1322
1323async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> {
1324    let gateway_json = match generate_invoice_with {
1325        LnInvoiceGeneration::LdkLightningCli => {
1326            // If we are paying a lnd invoice, we use the LDK node
1327            cmd!(GatewayLndCli, "info").out_json().await
1328        }
1329    }?;
1330    let gateway_id = gateway_json["gateway_id"]
1331        .as_str()
1332        .context("Missing gateway_id field")?;
1333
1334    Ok(gateway_id.into())
1335}