fedimint_load_test_tool/
main.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_precision_loss)]
4#![allow(clippy::missing_errors_doc)]
5#![allow(clippy::ref_option)]
6#![allow(clippy::too_many_lines)]
7#![allow(clippy::large_futures)]
8
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::time::Duration;
13use std::vec;
14
15use anyhow::{Context, bail};
16use clap::{Args, Parser, Subcommand, ValueEnum};
17use common::{
18    gateway_pay_invoice, get_note_summary, ldk_create_invoice, ldk_pay_invoice,
19    ldk_wait_invoice_payment, parse_gateway_id, reissue_notes,
20};
21use devimint::cmd;
22use devimint::util::GatewayLndCli;
23use fedimint_client::ClientHandleArc;
24use fedimint_core::Amount;
25use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
26use fedimint_core::invite_code::InviteCode;
27use fedimint_core::module::ApiRequestErased;
28use fedimint_core::runtime::spawn;
29use fedimint_core::util::{BoxFuture, SafeUrl};
30use fedimint_ln_client::{LightningClientModule, LnReceiveState};
31use fedimint_ln_common::LightningGateway;
32use fedimint_mint_client::OOBNotes;
33use futures::StreamExt;
34use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
35use serde::{Deserialize, Serialize};
36use tokio::fs::OpenOptions;
37use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
38use tokio::sync::mpsc;
39use tracing::{debug, info, warn};
40
41use crate::common::{
42    build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli,
43};
44pub mod common;
45
46#[derive(Parser, Clone)]
47#[command(version)]
48struct Opts {
49    #[arg(
50        long,
51        default_value = "10",
52        help = "Number of users. Each user will work in parallel"
53    )]
54    users: u16,
55
56    #[arg(long, help = "Output with the metrics results in JSON format")]
57    metrics_json_output: Option<PathBuf>,
58
59    #[arg(
60        long,
61        help = "If given, will be used to store and retrieve past metrics for comparison purposes"
62    )]
63    archive_dir: Option<PathBuf>,
64
65    #[clap(subcommand)]
66    command: Command,
67}
68
69#[derive(Debug, Clone, Copy, ValueEnum)]
70enum LnInvoiceGeneration {
71    LdkLightningCli,
72}
73
74#[derive(Subcommand, Clone)]
75enum Command {
76    #[command(about = "Keep many websocket connections to a federation for a duration of time")]
77    TestConnect {
78        #[arg(long, help = "Federation invite code")]
79        invite_code: String,
80        #[arg(
81            long,
82            default_value = "60",
83            help = "How much time to keep the connections open, in seconds"
84        )]
85        duration_secs: u64,
86        #[arg(
87            long,
88            default_value = "120",
89            help = "Timeout for connection attempt and for each request, in secnods"
90        )]
91        timeout_secs: u64,
92        #[arg(
93            long,
94            help = "If given, will limit the number of endpoints (guardians) to connect to"
95        )]
96        limit_endpoints: Option<usize>,
97    },
98    #[command(about = "Try to download the client config many times.")]
99    TestDownload {
100        #[arg(long, help = "Federation invite code")]
101        invite_code: String,
102    },
103    #[command(
104        about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway"
105    )]
106    LoadTest(LoadTestArgs),
107    /// Run a load test where many users in parallel will receive then send a
108    /// payment through lightning.
109    /// It's 'circular' because the funds always come back to the same user then
110    /// we can keep making the payments in a loop
111    #[command()]
112    LnCircularLoadTest(LnCircularLoadTestArgs),
113}
114
115#[derive(Args, Clone)]
116struct LoadTestArgs {
117    #[arg(
118        long,
119        help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
120    )]
121    invite_code: Option<InviteCode>,
122
123    #[arg(
124        long,
125        help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
126    )]
127    initial_notes: Option<OOBNotes>,
128
129    #[arg(
130        long,
131        help = "Gateway Id. If none, retrieve one according to --generate-invoice-with"
132    )]
133    gateway_id: Option<String>,
134
135    #[arg(
136        long,
137        help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)"
138    )]
139    generate_invoice_with: Option<LnInvoiceGeneration>,
140
141    #[arg(
142        long,
143        default_value = "1",
144        help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided"
145    )]
146    invoices_per_user: u16,
147
148    #[arg(
149        long,
150        default_value = "0",
151        help = "How many seconds to sleep between LN payments"
152    )]
153    ln_payment_sleep_secs: u64,
154
155    #[arg(
156        long,
157        help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid"
158    )]
159    invoices_file: Option<PathBuf>,
160
161    #[arg(
162        long,
163        help = "How many notes to distribute to each user",
164        default_value = "2"
165    )]
166    notes_per_user: u16,
167
168    #[arg(
169        long,
170        help = "Note denomination to use for the test",
171        default_value = "1024"
172    )]
173    note_denomination: Amount,
174
175    #[arg(
176        long,
177        help = "Invoice amount when generating one",
178        default_value = "1000"
179    )]
180    invoice_amount: Amount,
181}
182
183#[derive(Args, Clone)]
184struct LnCircularLoadTestArgs {
185    #[arg(
186        long,
187        help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
188    )]
189    invite_code: Option<InviteCode>,
190
191    #[arg(
192        long,
193        help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
194    )]
195    initial_notes: Option<OOBNotes>,
196
197    #[arg(
198        long,
199        default_value = "60",
200        help = "For how many seconds to run the test"
201    )]
202    test_duration_secs: u64,
203
204    #[arg(
205        long,
206        default_value = "0",
207        help = "How many seconds to sleep between LN payments"
208    )]
209    ln_payment_sleep_secs: u64,
210
211    #[arg(
212        long,
213        help = "How many notes to distribute to each user",
214        default_value = "1"
215    )]
216    notes_per_user: u16,
217
218    #[arg(
219        long,
220        help = "Note denomination to use for the test",
221        default_value = "1024"
222    )]
223    note_denomination: Amount,
224
225    #[arg(
226        long,
227        help = "Invoice amount when generating one",
228        default_value = "1000"
229    )]
230    invoice_amount: Amount,
231
232    #[arg(long)]
233    strategy: LnCircularStrategy,
234}
235
236#[derive(Debug, Clone, Copy, ValueEnum)]
237enum LnCircularStrategy {
238    /// The user will pay its own invoice
239    SelfPayment,
240    /// One gateway will pay/receive to/from the other, then they will swap
241    /// places
242    TwoGateways,
243    /// Two clients will pay to each other using the same gateway
244    PartnerPingPong,
245}
246
247#[derive(Debug, Clone)]
248pub struct MetricEvent {
249    name: String,
250    duration: Duration,
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
254struct EventMetricSummary {
255    name: String,
256    users: u64,
257    n: u64,
258    avg_ms: u128,
259    median_ms: u128,
260    max_ms: u128,
261    min_ms: u128,
262    timestamp_seconds: u64,
263}
264
265#[derive(Debug, Serialize, Deserialize)]
266struct EventMetricComparison {
267    avg_ms_gain: f64,
268    median_ms_gain: f64,
269    max_ms_gain: f64,
270    min_ms_gain: f64,
271    current: EventMetricSummary,
272    previous: EventMetricSummary,
273}
274
275impl std::fmt::Display for EventMetricComparison {
276    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277        fn to_percent(gain: f64) -> String {
278            if gain >= 1.0 {
279                format!("+{:.2}%", (gain - 1.0) * 100.0)
280            } else {
281                format!("-{:.2}%", (1.0 - gain) * 100.0)
282            }
283        }
284        f.write_str(&format!(
285            "avg: {}, median: {}, max: {}, min: {}",
286            to_percent(self.avg_ms_gain),
287            to_percent(self.median_ms_gain),
288            to_percent(self.max_ms_gain),
289            to_percent(self.min_ms_gain),
290        ))
291    }
292}
293
294#[tokio::main]
295async fn main() -> anyhow::Result<()> {
296    fedimint_logging::TracingSetup::default().init()?;
297    let opts = Opts::parse();
298    let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
299    let summary_handle = spawn("handle metrics summary", {
300        let opts = opts.clone();
301        async { handle_metrics_summary(opts, event_receiver).await }
302    });
303    let futures = match opts.command.clone() {
304        Command::TestConnect {
305            invite_code,
306            duration_secs,
307            timeout_secs,
308            limit_endpoints,
309        } => {
310            let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
311            test_connect_raw_client(
312                invite_code,
313                opts.users,
314                Duration::from_secs(duration_secs),
315                Duration::from_secs(timeout_secs),
316                limit_endpoints,
317                event_sender.clone(),
318            )
319            .await?
320        }
321        Command::TestDownload { invite_code } => {
322            let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
323            test_download_config(&invite_code, opts.users, &event_sender.clone())
324        }
325        Command::LoadTest(args) => {
326            let invite_code = invite_code_or_fallback(args.invite_code).await;
327
328            let gateway_id = if let Some(gateway_id) = args.gateway_id {
329                Some(gateway_id)
330            } else if let Some(generate_invoice_with) = args.generate_invoice_with {
331                Some(get_gateway_id(generate_invoice_with).await?)
332            } else {
333                None
334            };
335            let invoices = if let Some(invoices_file) = args.invoices_file {
336                let invoices_file = tokio::fs::File::open(&invoices_file)
337                    .await
338                    .with_context(|| format!("Failed to open {invoices_file:?}"))?;
339                let mut lines = tokio::io::BufReader::new(invoices_file).lines();
340                let mut invoices = vec![];
341                while let Some(line) = lines.next_line().await? {
342                    let invoice = Bolt11Invoice::from_str(&line)?;
343                    invoices.push(invoice);
344                }
345                invoices
346            } else {
347                vec![]
348            };
349            if args.generate_invoice_with.is_none() && invoices.is_empty() {
350                info!(
351                    "No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run"
352                );
353            }
354            run_load_test(
355                opts.archive_dir,
356                opts.users,
357                invite_code,
358                args.initial_notes,
359                args.generate_invoice_with,
360                args.invoices_per_user,
361                Duration::from_secs(args.ln_payment_sleep_secs),
362                invoices,
363                gateway_id,
364                args.notes_per_user,
365                args.note_denomination,
366                args.invoice_amount,
367                event_sender.clone(),
368            )
369            .await?
370        }
371        Command::LnCircularLoadTest(args) => {
372            let invite_code = invite_code_or_fallback(args.invite_code).await;
373            run_ln_circular_load_test(
374                opts.archive_dir,
375                opts.users,
376                invite_code,
377                args.initial_notes,
378                Duration::from_secs(args.test_duration_secs),
379                Duration::from_secs(args.ln_payment_sleep_secs),
380                args.notes_per_user,
381                args.note_denomination,
382                args.invoice_amount,
383                args.strategy,
384                event_sender.clone(),
385            )
386            .await?
387        }
388    };
389
390    let result = futures::future::join_all(futures).await;
391    drop(event_sender);
392    summary_handle.await??;
393    let len_failures = result.iter().filter(|r| r.is_err()).count();
394    eprintln!("{} results, {len_failures} failures", result.len());
395    for r in result {
396        if let Err(e) = r {
397            warn!("Task failed: {:?}", e);
398        }
399    }
400    if len_failures > 0 {
401        bail!("Finished with failures");
402    }
403    info!("Finished successfully");
404    Ok(())
405}
406
407async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> {
408    if let Some(invite_code) = invite_code {
409        Some(invite_code)
410    } else {
411        // Try to get an invite code through cli in a best effort basis
412        match get_invite_code_cli(0.into()).await {
413            Ok(invite_code) => Some(invite_code),
414            Err(e) => {
415                info!(
416                    "No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."
417                );
418                None
419            }
420        }
421    }
422}
423
424#[allow(clippy::too_many_arguments)]
425async fn run_load_test(
426    archive_dir: Option<PathBuf>,
427    users: u16,
428    invite_code: Option<InviteCode>,
429    initial_notes: Option<OOBNotes>,
430    generate_invoice_with: Option<LnInvoiceGeneration>,
431    generated_invoices_per_user: u16,
432    ln_payment_sleep: Duration,
433    invoices_from_file: Vec<Bolt11Invoice>,
434    gateway_id: Option<String>,
435    notes_per_user: u16,
436    note_denomination: Amount,
437    invoice_amount: Amount,
438    event_sender: mpsc::UnboundedSender<MetricEvent>,
439) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
440    let db_path = get_db_path(&archive_dir);
441    let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
442    let minimum_notes = notes_per_user * users;
443    let minimum_amount_required = note_denomination * u64::from(minimum_notes);
444
445    reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
446    get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
447    print_coordinator_notes(&coordinator).await?;
448    info!(
449        "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
450    );
451    remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
452    print_coordinator_notes(&coordinator).await?;
453
454    let users_clients = get_users_clients(users, db_path, invite_code).await?;
455
456    let mut users_notes =
457        get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
458    let mut users_invoices = HashMap::new();
459    let mut user = 0;
460    // Distribute invoices to users in a round robin fashion
461    for invoice in invoices_from_file {
462        users_invoices
463            .entry(user)
464            .or_insert_with(Vec::new)
465            .push(invoice);
466        user = (user + 1) % users;
467    }
468
469    info!("Starting user tasks");
470    let futures = users_clients
471        .into_iter()
472        .enumerate()
473        .map(|(u, client)| {
474            let u = u as u16;
475            let oob_notes = users_notes.remove(&u).unwrap();
476            let invoices = users_invoices.remove(&u).unwrap_or_default();
477            let event_sender = event_sender.clone();
478            let f: BoxFuture<_> = Box::pin(do_load_test_user_task(
479                format!("User {u}:"),
480                client,
481                oob_notes,
482                generated_invoices_per_user,
483                ln_payment_sleep,
484                invoice_amount,
485                invoices,
486                generate_invoice_with,
487                event_sender,
488                gateway_id.clone(),
489            ));
490            f
491        })
492        .collect::<Vec<_>>();
493
494    Ok(futures)
495}
496
497async fn get_notes_for_users(
498    users: u16,
499    notes_per_user: u16,
500    coordinator: ClientHandleArc,
501    note_denomination: Amount,
502) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> {
503    let mut users_notes = HashMap::new();
504    for u in 0..users {
505        users_notes.insert(u, Vec::with_capacity(notes_per_user.into()));
506        for _ in 0..notes_per_user {
507            let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?;
508            let user_amount = oob_notes.total_amount();
509            info!("Giving {user_amount} to user {u}");
510            users_notes.get_mut(&u).unwrap().push(oob_notes);
511        }
512    }
513    Ok(users_notes)
514}
515
516async fn get_users_clients(
517    n: u16,
518    db_path: Option<PathBuf>,
519    invite_code: Option<InviteCode>,
520) -> anyhow::Result<Vec<ClientHandleArc>> {
521    let mut users_clients = Vec::with_capacity(n.into());
522    for u in 0..n {
523        let (client, _) = get_user_client(u, &db_path, &invite_code).await?;
524        users_clients.push(client);
525    }
526    Ok(users_clients)
527}
528
529async fn get_user_client(
530    user_index: u16,
531    db_path: &Option<PathBuf>,
532    invite_code: &Option<InviteCode>,
533) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
534    let user_db = db_path
535        .as_ref()
536        .map(|db_path| db_path.join(format!("user_{user_index}.db")));
537    let user_invite_code = if user_db.as_ref().is_some_and(|db| db.exists()) {
538        None
539    } else {
540        invite_code.clone()
541    };
542    let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?;
543    // if lightning module is present, update the gateway cache
544    if let Ok(ln_client) = client.get_first_module::<LightningClientModule>() {
545        let _ = ln_client.update_gateway_cache().await;
546    }
547    Ok((client, invite_code))
548}
549
550async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> {
551    info!("Note summary:");
552    let summary = get_note_summary(coordinator).await?;
553    for (k, v) in summary.iter() {
554        info!("{k}: {v}");
555    }
556    Ok(())
557}
558
559async fn get_required_notes(
560    coordinator: &ClientHandleArc,
561    minimum_amount_required: Amount,
562    event_sender: &mpsc::UnboundedSender<MetricEvent>,
563) -> anyhow::Result<()> {
564    let current_balance = coordinator.get_balance().await;
565    if current_balance < minimum_amount_required {
566        let diff = minimum_amount_required.saturating_sub(current_balance);
567        info!(
568            "Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"
569        );
570        match try_get_notes_cli(&diff, 5).await {
571            Ok(notes) => {
572                info!("Got {} more notes, reissuing them", notes.total_amount());
573                reissue_notes(coordinator, notes, event_sender).await?;
574            }
575            Err(e) => {
576                info!("Unable to get more notes: '{e}', will try to proceed without them");
577            }
578        };
579    } else {
580        info!(
581            "Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"
582        );
583    }
584    Ok(())
585}
586
587async fn reissue_initial_notes(
588    initial_notes: Option<OOBNotes>,
589    coordinator: &ClientHandleArc,
590    event_sender: &mpsc::UnboundedSender<MetricEvent>,
591) -> anyhow::Result<()> {
592    if let Some(notes) = initial_notes {
593        let amount = notes.total_amount();
594        info!("Reissuing initial notes, got {amount}");
595        reissue_notes(coordinator, notes, event_sender).await?;
596    }
597    Ok(())
598}
599
600async fn get_coordinator_client(
601    db_path: &Option<PathBuf>,
602    invite_code: &Option<InviteCode>,
603) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
604    let (client, invite_code) = if let Some(db_path) = db_path {
605        let coordinator_db = db_path.join("coordinator.db");
606        if coordinator_db.exists() {
607            build_client(invite_code.clone(), Some(&coordinator_db)).await?
608        } else {
609            tokio::fs::create_dir_all(db_path).await?;
610            build_client(
611                Some(invite_code.clone().context(
612                    "Running on this archive dir for the first time, an invite code is required",
613                )?),
614                Some(&coordinator_db),
615            )
616            .await?
617        }
618    } else {
619        build_client(
620            Some(
621                invite_code
622                    .clone()
623                    .context("No archive dir given, an invite code is strictly required")?,
624            ),
625            None,
626        )
627        .await?
628    };
629    Ok((client, invite_code))
630}
631
632fn get_db_path(archive_dir: &Option<PathBuf>) -> Option<PathBuf> {
633    archive_dir.as_ref().map(|p| p.join("db"))
634}
635
636async fn get_lightning_gateway(
637    client: &ClientHandleArc,
638    gateway_id: Option<String>,
639) -> Option<LightningGateway> {
640    let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id");
641    let ln_module = client
642        .get_first_module::<LightningClientModule>()
643        .expect("Must have ln client module");
644    ln_module.select_gateway(&gateway_id).await
645}
646
647#[allow(clippy::too_many_arguments)]
648async fn do_load_test_user_task(
649    prefix: String,
650    client: ClientHandleArc,
651    oob_notes: Vec<OOBNotes>,
652    generated_invoices_per_user: u16,
653    ln_payment_sleep: Duration,
654    invoice_amount: Amount,
655    additional_invoices: Vec<Bolt11Invoice>,
656    generate_invoice_with: Option<LnInvoiceGeneration>,
657    event_sender: mpsc::UnboundedSender<MetricEvent>,
658    gateway_id: Option<String>,
659) -> anyhow::Result<()> {
660    let ln_gateway = get_lightning_gateway(&client, gateway_id).await;
661    for oob_note in oob_notes {
662        let amount = oob_note.total_amount();
663        reissue_notes(&client, oob_note, &event_sender)
664            .await
665            .map_err(|e| anyhow::anyhow!("while reissuing initial {amount}: {e}"))?;
666    }
667    let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable();
668    while let Some(_) = generated_invoices_per_user_iterator.next() {
669        let total_amount = get_note_summary(&client).await?.total_amount();
670        if invoice_amount > total_amount {
671            warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
672        } else {
673            match generate_invoice_with {
674                Some(LnInvoiceGeneration::LdkLightningCli) => {
675                    let invoice = ldk_create_invoice(invoice_amount).await?;
676                    gateway_pay_invoice(
677                        &prefix,
678                        "LND",
679                        &client,
680                        invoice.clone(),
681                        &event_sender,
682                        ln_gateway.clone(),
683                    )
684                    .await?;
685                    ldk_wait_invoice_payment(&invoice).await?;
686                }
687                None if additional_invoices.is_empty() => {
688                    debug!(
689                        "No method given to generate an invoice and no invoices on file, will not test the gateway"
690                    );
691                    break;
692                }
693                None => {
694                    break;
695                }
696            };
697            if generated_invoices_per_user_iterator.peek().is_some() {
698                // Only sleep while there are more invoices to pay
699                fedimint_core::task::sleep(ln_payment_sleep).await;
700            }
701        }
702    }
703    let mut additional_invoices = additional_invoices.into_iter().peekable();
704    while let Some(invoice) = additional_invoices.next() {
705        let total_amount = get_note_summary(&client).await?.total_amount();
706        let invoice_amount =
707            Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default());
708        if invoice_amount > total_amount {
709            warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
710        } else if invoice_amount == Amount::ZERO {
711            warn!("Can't pay invoice {invoice}, amount is zero");
712        } else {
713            gateway_pay_invoice(
714                &prefix,
715                "unknown",
716                &client,
717                invoice,
718                &event_sender,
719                ln_gateway.clone(),
720            )
721            .await?;
722            if additional_invoices.peek().is_some() {
723                // Only sleep while there are more invoices to pay
724                fedimint_core::task::sleep(ln_payment_sleep).await;
725            }
726        }
727    }
728    Ok(())
729}
730
731#[allow(clippy::too_many_arguments)]
732async fn run_ln_circular_load_test(
733    archive_dir: Option<PathBuf>,
734    users: u16,
735    invite_code: Option<InviteCode>,
736    initial_notes: Option<OOBNotes>,
737    test_duration: Duration,
738    ln_payment_sleep: Duration,
739    notes_per_user: u16,
740    note_denomination: Amount,
741    invoice_amount: Amount,
742    strategy: LnCircularStrategy,
743    event_sender: mpsc::UnboundedSender<MetricEvent>,
744) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
745    let db_path = get_db_path(&archive_dir);
746    let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
747    let minimum_notes = notes_per_user * users;
748    let minimum_amount_required = note_denomination * u64::from(minimum_notes);
749
750    reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
751    get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
752
753    info!(
754        "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
755    );
756    remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
757
758    print_coordinator_notes(&coordinator).await?;
759
760    let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?;
761
762    let mut users_notes =
763        get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
764
765    info!("Starting user tasks");
766    let futures = users_clients
767        .into_iter()
768        .enumerate()
769        .map(|(u, client)| {
770            let u = u as u16;
771            let oob_notes = users_notes.remove(&u).unwrap();
772            let event_sender = event_sender.clone();
773            let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task(
774                format!("User {u}:"),
775                client,
776                invite_code.clone(),
777                oob_notes,
778                test_duration,
779                ln_payment_sleep,
780                invoice_amount,
781                strategy,
782                event_sender,
783            ));
784            f
785        })
786        .collect::<Vec<_>>();
787
788    Ok(futures)
789}
790
791#[allow(clippy::too_many_arguments)]
792async fn do_ln_circular_test_user_task(
793    prefix: String,
794    client: ClientHandleArc,
795    invite_code: Option<InviteCode>,
796    oob_notes: Vec<OOBNotes>,
797    test_duration: Duration,
798    ln_payment_sleep: Duration,
799    invoice_amount: Amount,
800    strategy: LnCircularStrategy,
801    event_sender: mpsc::UnboundedSender<MetricEvent>,
802) -> anyhow::Result<()> {
803    for oob_note in oob_notes {
804        let amount = oob_note.total_amount();
805        reissue_notes(&client, oob_note, &event_sender)
806            .await
807            .map_err(|e| anyhow::anyhow!("while reissuing initial {amount}: {e}"))?;
808    }
809    let initial_time = fedimint_core::time::now();
810    let still_ontime = || async {
811        fedimint_core::time::now()
812            .duration_since(initial_time)
813            .expect("time to work")
814            <= test_duration
815    };
816    let sleep_a_bit = || async {
817        if still_ontime().await {
818            fedimint_core::task::sleep(ln_payment_sleep).await;
819        }
820    };
821    match strategy {
822        LnCircularStrategy::TwoGateways => {
823            let invoice_generation = LnInvoiceGeneration::LdkLightningCli;
824            while still_ontime().await {
825                let gateway_id = get_gateway_id(invoice_generation).await?;
826                let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await;
827                run_two_gateways_strategy(
828                    &prefix,
829                    &invoice_generation,
830                    &invoice_amount,
831                    &event_sender,
832                    &client,
833                    ln_gateway,
834                )
835                .await?;
836                sleep_a_bit().await;
837            }
838        }
839        LnCircularStrategy::SelfPayment => {
840            while still_ontime().await {
841                do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?;
842                sleep_a_bit().await;
843            }
844        }
845        LnCircularStrategy::PartnerPingPong => {
846            let (partner, _) = build_client(invite_code, None).await?;
847            while still_ontime().await {
848                do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender)
849                    .await?;
850                sleep_a_bit().await;
851            }
852        }
853    }
854    Ok(())
855}
856
857const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice";
858
859async fn run_two_gateways_strategy(
860    prefix: &str,
861    invoice_generation: &LnInvoiceGeneration,
862    invoice_amount: &Amount,
863    event_sender: &mpsc::UnboundedSender<MetricEvent>,
864    client: &ClientHandleArc,
865    ln_gateway: Option<LightningGateway>,
866) -> Result<(), anyhow::Error> {
867    let create_invoice_time = fedimint_core::time::now();
868    match *invoice_generation {
869        LnInvoiceGeneration::LdkLightningCli => {
870            let invoice = ldk_create_invoice(*invoice_amount).await?;
871            let elapsed = create_invoice_time.elapsed()?;
872            info!("Created invoice using CLN in {elapsed:?}");
873            event_sender.send(MetricEvent {
874                name: GATEWAY_CREATE_INVOICE.into(),
875                duration: elapsed,
876            })?;
877            gateway_pay_invoice(
878                prefix,
879                "LND",
880                client,
881                invoice.clone(),
882                event_sender,
883                ln_gateway.clone(),
884            )
885            .await?;
886            ldk_wait_invoice_payment(&invoice).await?;
887            let (operation_id, invoice) =
888                client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?;
889            let pay_invoice_time = fedimint_core::time::now();
890            ldk_pay_invoice(invoice).await?;
891            wait_invoice_payment(
892                prefix,
893                "LND",
894                client,
895                operation_id,
896                event_sender,
897                pay_invoice_time,
898            )
899            .await?;
900        }
901    };
902    Ok(())
903}
904
905async fn do_self_payment(
906    prefix: &str,
907    client: &ClientHandleArc,
908    invoice_amount: Amount,
909    event_sender: &mpsc::UnboundedSender<MetricEvent>,
910) -> anyhow::Result<()> {
911    let (operation_id, invoice) =
912        client_create_invoice(client, invoice_amount, event_sender, None).await?;
913    let pay_invoice_time = fedimint_core::time::now();
914    let lightning_module = client.get_first_module::<LightningClientModule>()?;
915    //let gateway = lightning_module.select_active_gateway_opt().await;
916    lightning_module
917        .pay_bolt11_invoice(None, invoice, ())
918        .await?;
919    wait_invoice_payment(
920        prefix,
921        "gateway",
922        client,
923        operation_id,
924        event_sender,
925        pay_invoice_time,
926    )
927    .await?;
928    Ok(())
929}
930
931async fn do_partner_ping_pong(
932    prefix: &str,
933    client: &ClientHandleArc,
934    partner: &ClientHandleArc,
935    invoice_amount: Amount,
936    event_sender: &mpsc::UnboundedSender<MetricEvent>,
937) -> anyhow::Result<()> {
938    // Ping (partner creates invoice, client pays)
939    let (operation_id, invoice) =
940        client_create_invoice(partner, invoice_amount, event_sender, None).await?;
941    let pay_invoice_time = fedimint_core::time::now();
942    let lightning_module = client.get_first_module::<LightningClientModule>()?;
943    // TODO: Select random gateway?
944    //let gateway = lightning_module.select_active_gateway_opt().await;
945    lightning_module
946        .pay_bolt11_invoice(None, invoice, ())
947        .await?;
948    wait_invoice_payment(
949        prefix,
950        "gateway",
951        partner,
952        operation_id,
953        event_sender,
954        pay_invoice_time,
955    )
956    .await?;
957    // Pong (client creates invoice, partner pays)
958    let (operation_id, invoice) =
959        client_create_invoice(client, invoice_amount, event_sender, None).await?;
960    let pay_invoice_time = fedimint_core::time::now();
961    let partner_lightning_module = partner.get_first_module::<LightningClientModule>()?;
962    //let gateway = partner_lightning_module.select_active_gateway_opt().await;
963    // TODO: Select random gateway?
964    partner_lightning_module
965        .pay_bolt11_invoice(None, invoice, ())
966        .await?;
967    wait_invoice_payment(
968        prefix,
969        "gateway",
970        client,
971        operation_id,
972        event_sender,
973        pay_invoice_time,
974    )
975    .await?;
976    Ok(())
977}
978
979async fn wait_invoice_payment(
980    prefix: &str,
981    gateway_name: &str,
982    client: &ClientHandleArc,
983    operation_id: fedimint_core::core::OperationId,
984    event_sender: &mpsc::UnboundedSender<MetricEvent>,
985    pay_invoice_time: std::time::SystemTime,
986) -> anyhow::Result<()> {
987    let elapsed = pay_invoice_time.elapsed()?;
988    info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}");
989    event_sender.send(MetricEvent {
990        name: format!("gateway_{gateway_name}_payment_received_started"),
991        duration: elapsed,
992    })?;
993    let lightning_module = client.get_first_module::<LightningClientModule>()?;
994    let mut updates = lightning_module
995        .subscribe_ln_receive(operation_id)
996        .await?
997        .into_stream();
998    while let Some(update) = updates.next().await {
999        debug!(%prefix, ?update, "Invoice payment update");
1000        match update {
1001            LnReceiveState::Claimed => {
1002                let elapsed: Duration = pay_invoice_time.elapsed()?;
1003                info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}");
1004                event_sender.send(MetricEvent {
1005                    name: "gateway_payment_received_success".into(),
1006                    duration: elapsed,
1007                })?;
1008                event_sender.send(MetricEvent {
1009                    name: format!("gateway_{gateway_name}_payment_received_success"),
1010                    duration: elapsed,
1011                })?;
1012                break;
1013            }
1014            LnReceiveState::Canceled { reason } => {
1015                let elapsed: Duration = pay_invoice_time.elapsed()?;
1016                info!(
1017                    "{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"
1018                );
1019                event_sender.send(MetricEvent {
1020                    name: "gateway_payment_received_canceled".into(),
1021                    duration: elapsed,
1022                })?;
1023                break;
1024            }
1025            _ => {}
1026        }
1027    }
1028    Ok(())
1029}
1030
1031async fn client_create_invoice(
1032    client: &ClientHandleArc,
1033    invoice_amount: Amount,
1034    event_sender: &mpsc::UnboundedSender<MetricEvent>,
1035    ln_gateway: Option<LightningGateway>,
1036) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> {
1037    let create_invoice_time = fedimint_core::time::now();
1038    let lightning_module = client.get_first_module::<LightningClientModule>()?;
1039    let desc = Description::new("test".to_string())?;
1040    let (operation_id, invoice, _) = lightning_module
1041        .create_bolt11_invoice(
1042            invoice_amount,
1043            Bolt11InvoiceDescription::Direct(&desc),
1044            None,
1045            (),
1046            ln_gateway,
1047        )
1048        .await?;
1049    let elapsed = create_invoice_time.elapsed()?;
1050    info!("Created invoice using gateway in {elapsed:?}");
1051    event_sender.send(MetricEvent {
1052        name: GATEWAY_CREATE_INVOICE.into(),
1053        duration: elapsed,
1054    })?;
1055    Ok((operation_id, invoice))
1056}
1057
1058fn test_download_config(
1059    invite_code: &InviteCode,
1060    users: u16,
1061    event_sender: &mpsc::UnboundedSender<MetricEvent>,
1062) -> Vec<BoxFuture<'static, anyhow::Result<()>>> {
1063    (0..users)
1064        .map(|_| {
1065            let invite_code = invite_code.clone();
1066            let event_sender = event_sender.clone();
1067            let f: BoxFuture<_> = Box::pin(async move {
1068                let m = fedimint_core::time::now();
1069                let _ = fedimint_api_client::api::net::Connector::default()
1070                    .download_from_invite_code(&invite_code)
1071                    .await?;
1072                event_sender.send(MetricEvent {
1073                    name: "download_client_config".into(),
1074                    duration: m.elapsed()?,
1075                })?;
1076                Ok(())
1077            });
1078            f
1079        })
1080        .collect()
1081}
1082
1083async fn test_connect_raw_client(
1084    invite_code: InviteCode,
1085    users: u16,
1086    duration: Duration,
1087    timeout: Duration,
1088    limit_endpoints: Option<usize>,
1089    event_sender: mpsc::UnboundedSender<MetricEvent>,
1090) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
1091    use jsonrpsee_core::client::ClientT;
1092    use jsonrpsee_ws_client::WsClientBuilder;
1093
1094    let mut cfg = fedimint_api_client::api::net::Connector::default()
1095        .download_from_invite_code(&invite_code)
1096        .await?;
1097
1098    if let Some(limit_endpoints) = limit_endpoints {
1099        cfg.global.api_endpoints = cfg
1100            .global
1101            .api_endpoints
1102            .into_iter()
1103            .take(limit_endpoints)
1104            .collect();
1105        info!("Limiting endpoints to {:?}", cfg.global.api_endpoints);
1106    }
1107
1108    info!("Connecting to {users} clients");
1109    let clients = (0..users)
1110        .flat_map(|_| {
1111            let clients = cfg.global.api_endpoints.values().map(|url| async {
1112                let ws_client = WsClientBuilder::default()
1113                    .request_timeout(timeout)
1114                    .connection_timeout(timeout)
1115                    .build(url_to_string_with_default_port(&url.url))
1116                    .await?;
1117                Ok::<_, anyhow::Error>(ws_client)
1118            });
1119            clients
1120        })
1121        .collect::<Vec<_>>();
1122    let clients = futures::future::try_join_all(clients).await?;
1123    info!("Keeping {users} clients connected for {duration:?}");
1124    Ok(clients
1125        .into_iter()
1126        .map(|client| {
1127            let event_sender = event_sender.clone();
1128            let f: BoxFuture<_> = Box::pin(async move {
1129                let initial_time = fedimint_core::time::now();
1130                while initial_time.elapsed()? < duration {
1131                    let m = fedimint_core::time::now();
1132                    let _epoch: u64 = client
1133                        .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()])
1134                        .await?;
1135                    event_sender.send(MetricEvent {
1136                        name: SESSION_COUNT_ENDPOINT.into(),
1137                        duration: m.elapsed()?,
1138                    })?;
1139                    fedimint_core::task::sleep(Duration::from_secs(1)).await;
1140                }
1141                Ok(())
1142            });
1143            f
1144        })
1145        .collect())
1146}
1147
1148fn url_to_string_with_default_port(url: &SafeUrl) -> String {
1149    format!(
1150        "{}://{}:{}{}",
1151        url.scheme(),
1152        url.host().expect("Asserted on construction"),
1153        url.port_or_known_default()
1154            .expect("Asserted on construction"),
1155        url.path()
1156    )
1157}
1158
1159async fn handle_metrics_summary(
1160    opts: Opts,
1161    mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>,
1162) -> anyhow::Result<()> {
1163    let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs();
1164    let mut metrics_json_output_files = vec![];
1165    let mut previous_metrics = vec![];
1166    let mut comparison_output = None;
1167    if let Some(archive_dir) = opts.archive_dir {
1168        let mut archive_metrics = archive_dir.join("metrics");
1169        archive_metrics.push(opts.users.to_string());
1170        tokio::fs::create_dir_all(&archive_metrics).await?;
1171        let mut archive_comparisons = archive_dir.join("comparisons");
1172        archive_comparisons.push(opts.users.to_string());
1173        tokio::fs::create_dir_all(&archive_comparisons).await?;
1174
1175        let latest_metrics_file = std::fs::read_dir(&archive_metrics)?
1176            .map(|entry| {
1177                let entry = entry.unwrap();
1178                let metadata = entry.metadata().unwrap();
1179                let created = metadata
1180                    .created()
1181                    .unwrap_or_else(|_| metadata.modified().unwrap());
1182                (entry, created)
1183            })
1184            .max_by_key(|(_entry, created)| created.to_owned())
1185            .map(|(entry, _)| entry.path());
1186        if let Some(latest_metrics_file) = latest_metrics_file {
1187            let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file)
1188                .await
1189                .with_context(|| format!("Failed to open {latest_metrics_file:?}"))?;
1190            let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines();
1191            while let Some(line) = lines.next_line().await? {
1192                match serde_json::from_str::<EventMetricSummary>(&line) {
1193                    Ok(metric) => {
1194                        previous_metrics.push(metric);
1195                    }
1196                    Err(e) => {
1197                        warn!("Failed to parse previous metric: {e:?}");
1198                    }
1199                }
1200            }
1201        }
1202        let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",));
1203        let new_metric_output = BufWriter::new(
1204            OpenOptions::new()
1205                .write(true)
1206                .create(true)
1207                .truncate(true)
1208                .open(new_metric_output)
1209                .await?,
1210        );
1211        metrics_json_output_files.push(new_metric_output);
1212        if !previous_metrics.is_empty() {
1213            let new_comparison_output =
1214                archive_comparisons.join(format!("{timestamp_seconds}.json",));
1215            comparison_output = Some(BufWriter::new(
1216                OpenOptions::new()
1217                    .write(true)
1218                    .create(true)
1219                    .truncate(true)
1220                    .open(new_comparison_output)
1221                    .await?,
1222            ));
1223        }
1224    }
1225    if let Some(metrics_json_output) = opts.metrics_json_output {
1226        metrics_json_output_files.push(BufWriter::new(
1227            tokio::fs::OpenOptions::new()
1228                .write(true)
1229                .create(true)
1230                .truncate(true)
1231                .open(metrics_json_output)
1232                .await?,
1233        ));
1234    }
1235    let mut results = BTreeMap::new();
1236    while let Some(event) = event_receiver.recv().await {
1237        let entry = results.entry(event.name).or_insert_with(Vec::new);
1238        entry.push(event.duration);
1239    }
1240    let mut previous_metrics = previous_metrics
1241        .into_iter()
1242        .map(|metric| (metric.name.clone(), metric))
1243        .collect::<HashMap<_, _>>();
1244    for (k, mut v) in results {
1245        v.sort();
1246        let n = v.len();
1247        let max = v.iter().last().unwrap();
1248        let min = v.first().unwrap();
1249        let median = v[n / 2];
1250        let sum: Duration = v.iter().sum();
1251        let avg = sum / n as u32;
1252        let metric_summary = EventMetricSummary {
1253            name: k.clone(),
1254            users: u64::from(opts.users),
1255            n: n as u64,
1256            avg_ms: avg.as_millis(),
1257            median_ms: median.as_millis(),
1258            max_ms: max.as_millis(),
1259            min_ms: min.as_millis(),
1260            timestamp_seconds,
1261        };
1262        let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) {
1263            if previous_metric.n == metric_summary.n {
1264                fn calculate_gain(current: u128, previous: u128) -> f64 {
1265                    current as f64 / previous as f64
1266                }
1267                let comparison = EventMetricComparison {
1268                    avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms),
1269                    median_ms_gain: calculate_gain(
1270                        metric_summary.median_ms,
1271                        previous_metric.median_ms,
1272                    ),
1273                    max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms),
1274                    min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms),
1275                    current: metric_summary.clone(),
1276                    previous: previous_metric,
1277                };
1278                if let Some(comparison_output) = &mut comparison_output {
1279                    let comparison_json =
1280                        serde_json::to_string(&comparison).expect("to be serializable");
1281                    comparison_output
1282                        .write_all(format!("{comparison_json}\n").as_bytes())
1283                        .await
1284                        .expect("to write on file");
1285                }
1286                Some(comparison)
1287            } else {
1288                info!(
1289                    "Skipping comparison for {k} because previous metric has different n ({} vs {})",
1290                    previous_metric.n, metric_summary.n
1291                );
1292                None
1293            }
1294        } else {
1295            None
1296        };
1297        if let Some(comparison) = comparison {
1298            println!(
1299                "{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"
1300            );
1301        } else {
1302            println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}");
1303        }
1304        let metric_summary_json =
1305            serde_json::to_string(&metric_summary).expect("to be serializable");
1306        for metrics_json_output_file in &mut metrics_json_output_files {
1307            metrics_json_output_file
1308                .write_all(format!("{metric_summary_json}\n").as_bytes())
1309                .await
1310                .expect("to write on file");
1311        }
1312    }
1313    for mut output in metrics_json_output_files {
1314        output.flush().await?;
1315    }
1316    if let Some(mut output) = comparison_output {
1317        output.flush().await?;
1318    }
1319    Ok(())
1320}
1321
1322async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> {
1323    let gateway_json = match generate_invoice_with {
1324        LnInvoiceGeneration::LdkLightningCli => {
1325            // If we are paying a lnd invoice, we use the LDK node
1326            cmd!(GatewayLndCli, "info").out_json().await
1327        }
1328    }?;
1329    let gateway_id = gateway_json["gateway_id"]
1330        .as_str()
1331        .context("Missing gateway_id field")?;
1332
1333    Ok(gateway_id.into())
1334}