fedimint_load_test_tool/
main.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_precision_loss)]
4#![allow(clippy::missing_errors_doc)]
5#![allow(clippy::ref_option)]
6#![allow(clippy::too_many_lines)]
7#![allow(clippy::large_futures)]
8
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::time::Duration;
13use std::vec;
14
15use anyhow::{Context, anyhow, bail};
16use clap::{Args, Parser, Subcommand, ValueEnum};
17use common::{
18    gateway_pay_invoice, get_note_summary, ldk_create_invoice, ldk_pay_invoice,
19    ldk_wait_invoice_payment, parse_gateway_id, reissue_notes,
20};
21use devimint::cmd;
22use devimint::util::GatewayLndCli;
23use fedimint_client::ClientHandleArc;
24use fedimint_core::Amount;
25use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
26use fedimint_core::invite_code::InviteCode;
27use fedimint_core::module::ApiRequestErased;
28use fedimint_core::runtime::spawn;
29use fedimint_core::util::{BoxFuture, SafeUrl};
30use fedimint_ln_client::{LightningClientModule, LnReceiveState};
31use fedimint_ln_common::LightningGateway;
32use fedimint_mint_client::OOBNotes;
33use futures::StreamExt;
34use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
35use serde::{Deserialize, Serialize};
36use tokio::fs::OpenOptions;
37use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
38use tokio::sync::mpsc;
39use tracing::{debug, info, warn};
40
41use crate::common::{
42    build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli,
43};
44pub mod common;
45
46#[derive(Parser, Clone)]
47#[command(version)]
48struct Opts {
49    #[arg(
50        long,
51        default_value = "10",
52        help = "Number of users. Each user will work in parallel"
53    )]
54    users: u16,
55
56    #[arg(long, help = "Output with the metrics results in JSON format")]
57    metrics_json_output: Option<PathBuf>,
58
59    #[arg(
60        long,
61        help = "If given, will be used to store and retrieve past metrics for comparison purposes"
62    )]
63    archive_dir: Option<PathBuf>,
64
65    #[clap(subcommand)]
66    command: Command,
67}
68
69#[derive(Debug, Clone, Copy, ValueEnum)]
70enum LnInvoiceGeneration {
71    LdkLightningCli,
72}
73
74#[derive(Subcommand, Clone)]
75enum Command {
76    #[command(about = "Keep many websocket connections to a federation for a duration of time")]
77    TestConnect {
78        #[arg(long, help = "Federation invite code")]
79        invite_code: String,
80        #[arg(
81            long,
82            default_value = "60",
83            help = "How much time to keep the connections open, in seconds"
84        )]
85        duration_secs: u64,
86        #[arg(
87            long,
88            default_value = "120",
89            help = "Timeout for connection attempt and for each request, in secnods"
90        )]
91        timeout_secs: u64,
92        #[arg(
93            long,
94            help = "If given, will limit the number of endpoints (guardians) to connect to"
95        )]
96        limit_endpoints: Option<usize>,
97    },
98    #[command(about = "Try to download the client config many times.")]
99    TestDownload {
100        #[arg(long, help = "Federation invite code")]
101        invite_code: String,
102    },
103    #[command(
104        about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway"
105    )]
106    LoadTest(LoadTestArgs),
107    /// Run a load test where many users in parallel will receive then send a
108    /// payment through lightning.
109    /// It's 'circular' because the funds always come back to the same user then
110    /// we can keep making the payments in a loop
111    #[command()]
112    LnCircularLoadTest(LnCircularLoadTestArgs),
113}
114
115#[derive(Args, Clone)]
116struct LoadTestArgs {
117    #[arg(
118        long,
119        help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
120    )]
121    invite_code: Option<InviteCode>,
122
123    #[arg(
124        long,
125        help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
126    )]
127    initial_notes: Option<OOBNotes>,
128
129    #[arg(
130        long,
131        help = "Gateway Id. If none, retrieve one according to --generate-invoice-with"
132    )]
133    gateway_id: Option<String>,
134
135    #[arg(
136        long,
137        help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)"
138    )]
139    generate_invoice_with: Option<LnInvoiceGeneration>,
140
141    #[arg(
142        long,
143        default_value = "1",
144        help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided"
145    )]
146    invoices_per_user: u16,
147
148    #[arg(
149        long,
150        default_value = "0",
151        help = "How many seconds to sleep between LN payments"
152    )]
153    ln_payment_sleep_secs: u64,
154
155    #[arg(
156        long,
157        help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid"
158    )]
159    invoices_file: Option<PathBuf>,
160
161    #[arg(
162        long,
163        help = "How many notes to distribute to each user",
164        default_value = "2"
165    )]
166    notes_per_user: u16,
167
168    #[arg(
169        long,
170        help = "Note denomination to use for the test",
171        default_value = "1024"
172    )]
173    note_denomination: Amount,
174
175    #[arg(
176        long,
177        help = "Invoice amount when generating one",
178        default_value = "1000"
179    )]
180    invoice_amount: Amount,
181}
182
183#[derive(Args, Clone)]
184struct LnCircularLoadTestArgs {
185    #[arg(
186        long,
187        help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
188    )]
189    invite_code: Option<InviteCode>,
190
191    #[arg(
192        long,
193        help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
194    )]
195    initial_notes: Option<OOBNotes>,
196
197    #[arg(
198        long,
199        default_value = "60",
200        help = "For how many seconds to run the test"
201    )]
202    test_duration_secs: u64,
203
204    #[arg(
205        long,
206        default_value = "0",
207        help = "How many seconds to sleep between LN payments"
208    )]
209    ln_payment_sleep_secs: u64,
210
211    #[arg(
212        long,
213        help = "How many notes to distribute to each user",
214        default_value = "1"
215    )]
216    notes_per_user: u16,
217
218    #[arg(
219        long,
220        help = "Note denomination to use for the test",
221        default_value = "1024"
222    )]
223    note_denomination: Amount,
224
225    #[arg(
226        long,
227        help = "Invoice amount when generating one",
228        default_value = "1000"
229    )]
230    invoice_amount: Amount,
231
232    #[arg(long)]
233    strategy: LnCircularStrategy,
234}
235
236#[derive(Debug, Clone, Copy, ValueEnum)]
237enum LnCircularStrategy {
238    /// The user will pay its own invoice
239    SelfPayment,
240    /// One gateway will pay/receive to/from the other, then they will swap
241    /// places
242    TwoGateways,
243    /// Two clients will pay to each other using the same gateway
244    PartnerPingPong,
245}
246
247#[derive(Debug, Clone)]
248pub struct MetricEvent {
249    name: String,
250    duration: Duration,
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
254struct EventMetricSummary {
255    name: String,
256    users: u64,
257    n: u64,
258    avg_ms: u128,
259    median_ms: u128,
260    max_ms: u128,
261    min_ms: u128,
262    timestamp_seconds: u64,
263}
264
265#[derive(Debug, Serialize, Deserialize)]
266struct EventMetricComparison {
267    avg_ms_gain: f64,
268    median_ms_gain: f64,
269    max_ms_gain: f64,
270    min_ms_gain: f64,
271    current: EventMetricSummary,
272    previous: EventMetricSummary,
273}
274
275impl std::fmt::Display for EventMetricComparison {
276    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277        fn to_percent(gain: f64) -> String {
278            if gain >= 1.0 {
279                format!("+{:.2}%", (gain - 1.0) * 100.0)
280            } else {
281                format!("-{:.2}%", (1.0 - gain) * 100.0)
282            }
283        }
284        f.write_str(&format!(
285            "avg: {}, median: {}, max: {}, min: {}",
286            to_percent(self.avg_ms_gain),
287            to_percent(self.median_ms_gain),
288            to_percent(self.max_ms_gain),
289            to_percent(self.min_ms_gain),
290        ))
291    }
292}
293
294#[tokio::main]
295async fn main() -> anyhow::Result<()> {
296    fedimint_logging::TracingSetup::default().init()?;
297    let opts = Opts::parse();
298    let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
299    let summary_handle = spawn("handle metrics summary", {
300        let opts = opts.clone();
301        async { handle_metrics_summary(opts, event_receiver).await }
302    });
303    let futures = match opts.command.clone() {
304        Command::TestConnect {
305            invite_code,
306            duration_secs,
307            timeout_secs,
308            limit_endpoints,
309        } => {
310            let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
311            test_connect_raw_client(
312                invite_code,
313                opts.users,
314                Duration::from_secs(duration_secs),
315                Duration::from_secs(timeout_secs),
316                limit_endpoints,
317                event_sender.clone(),
318            )
319            .await?
320        }
321        Command::TestDownload { invite_code } => {
322            let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
323            test_download_config(&invite_code, opts.users, &event_sender.clone())
324        }
325        Command::LoadTest(args) => {
326            let invite_code = invite_code_or_fallback(args.invite_code).await;
327
328            let gateway_id = if let Some(gateway_id) = args.gateway_id {
329                Some(gateway_id)
330            } else if let Some(generate_invoice_with) = args.generate_invoice_with {
331                Some(get_gateway_id(generate_invoice_with).await?)
332            } else {
333                None
334            };
335            let invoices = if let Some(invoices_file) = args.invoices_file {
336                let display_path = invoices_file.display();
337                let invoices_file = tokio::fs::File::open(&invoices_file)
338                    .await
339                    .with_context(|| format!("Failed to open {display_path}"))?;
340                let mut lines = tokio::io::BufReader::new(invoices_file).lines();
341                let mut invoices = vec![];
342                while let Some(line) = lines.next_line().await? {
343                    let invoice = Bolt11Invoice::from_str(&line)?;
344                    invoices.push(invoice);
345                }
346                invoices
347            } else {
348                vec![]
349            };
350            if args.generate_invoice_with.is_none() && invoices.is_empty() {
351                info!(
352                    "No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run"
353                );
354            }
355            run_load_test(
356                opts.archive_dir,
357                opts.users,
358                invite_code,
359                args.initial_notes,
360                args.generate_invoice_with,
361                args.invoices_per_user,
362                Duration::from_secs(args.ln_payment_sleep_secs),
363                invoices,
364                gateway_id,
365                args.notes_per_user,
366                args.note_denomination,
367                args.invoice_amount,
368                event_sender.clone(),
369            )
370            .await?
371        }
372        Command::LnCircularLoadTest(args) => {
373            let invite_code = invite_code_or_fallback(args.invite_code).await;
374            run_ln_circular_load_test(
375                opts.archive_dir,
376                opts.users,
377                invite_code,
378                args.initial_notes,
379                Duration::from_secs(args.test_duration_secs),
380                Duration::from_secs(args.ln_payment_sleep_secs),
381                args.notes_per_user,
382                args.note_denomination,
383                args.invoice_amount,
384                args.strategy,
385                event_sender.clone(),
386            )
387            .await?
388        }
389    };
390
391    let result = futures::future::join_all(futures).await;
392    drop(event_sender);
393    summary_handle.await??;
394    let len_failures = result.iter().filter(|r| r.is_err()).count();
395    eprintln!("{} results, {len_failures} failures", result.len());
396    for r in result {
397        if let Err(e) = r {
398            warn!("Task failed: {:?}", e);
399        }
400    }
401    if len_failures > 0 {
402        bail!("Finished with failures");
403    }
404    info!("Finished successfully");
405    Ok(())
406}
407
408async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> {
409    if let Some(invite_code) = invite_code {
410        Some(invite_code)
411    } else {
412        // Try to get an invite code through cli in a best effort basis
413        match get_invite_code_cli(0.into()).await {
414            Ok(invite_code) => Some(invite_code),
415            Err(e) => {
416                info!(
417                    "No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."
418                );
419                None
420            }
421        }
422    }
423}
424
425#[allow(clippy::too_many_arguments)]
426async fn run_load_test(
427    archive_dir: Option<PathBuf>,
428    users: u16,
429    invite_code: Option<InviteCode>,
430    initial_notes: Option<OOBNotes>,
431    generate_invoice_with: Option<LnInvoiceGeneration>,
432    generated_invoices_per_user: u16,
433    ln_payment_sleep: Duration,
434    invoices_from_file: Vec<Bolt11Invoice>,
435    gateway_id: Option<String>,
436    notes_per_user: u16,
437    note_denomination: Amount,
438    invoice_amount: Amount,
439    event_sender: mpsc::UnboundedSender<MetricEvent>,
440) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
441    let db_path = get_db_path(&archive_dir);
442    let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
443    let minimum_notes = notes_per_user * users;
444    let minimum_amount_required = note_denomination * u64::from(minimum_notes);
445
446    reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
447    get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
448    print_coordinator_notes(&coordinator).await?;
449    info!(
450        "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
451    );
452    remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
453    print_coordinator_notes(&coordinator).await?;
454
455    let users_clients = get_users_clients(users, db_path, invite_code).await?;
456
457    let mut users_notes =
458        get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
459    let mut users_invoices = HashMap::new();
460    let mut user = 0;
461    // Distribute invoices to users in a round robin fashion
462    for invoice in invoices_from_file {
463        users_invoices
464            .entry(user)
465            .or_insert_with(Vec::new)
466            .push(invoice);
467        user = (user + 1) % users;
468    }
469
470    info!("Starting user tasks");
471    let futures = users_clients
472        .into_iter()
473        .enumerate()
474        .map(|(u, client)| {
475            let u = u as u16;
476            let oob_notes = users_notes.remove(&u).unwrap();
477            let invoices = users_invoices.remove(&u).unwrap_or_default();
478            let event_sender = event_sender.clone();
479            let f: BoxFuture<_> = Box::pin(do_load_test_user_task(
480                format!("User {u}:"),
481                client,
482                oob_notes,
483                generated_invoices_per_user,
484                ln_payment_sleep,
485                invoice_amount,
486                invoices,
487                generate_invoice_with,
488                event_sender,
489                gateway_id.clone(),
490            ));
491            f
492        })
493        .collect::<Vec<_>>();
494
495    Ok(futures)
496}
497
498async fn get_notes_for_users(
499    users: u16,
500    notes_per_user: u16,
501    coordinator: ClientHandleArc,
502    note_denomination: Amount,
503) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> {
504    let mut users_notes = HashMap::new();
505    for u in 0..users {
506        users_notes.insert(u, Vec::with_capacity(notes_per_user.into()));
507        for _ in 0..notes_per_user {
508            let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?;
509            let user_amount = oob_notes.total_amount();
510            info!("Giving {user_amount} to user {u}");
511            users_notes.get_mut(&u).unwrap().push(oob_notes);
512        }
513    }
514    Ok(users_notes)
515}
516
517async fn get_users_clients(
518    n: u16,
519    db_path: Option<PathBuf>,
520    invite_code: Option<InviteCode>,
521) -> anyhow::Result<Vec<ClientHandleArc>> {
522    let mut users_clients = Vec::with_capacity(n.into());
523    for u in 0..n {
524        let (client, _) = get_user_client(u, &db_path, &invite_code).await?;
525        users_clients.push(client);
526    }
527    Ok(users_clients)
528}
529
530async fn get_user_client(
531    user_index: u16,
532    db_path: &Option<PathBuf>,
533    invite_code: &Option<InviteCode>,
534) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
535    let user_db = db_path
536        .as_ref()
537        .map(|db_path| db_path.join(format!("user_{user_index}.db")));
538    let user_invite_code = if user_db.as_ref().is_some_and(|db| db.exists()) {
539        None
540    } else {
541        invite_code.clone()
542    };
543    let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?;
544    // if lightning module is present, update the gateway cache
545    if let Ok(ln_client) = client.get_first_module::<LightningClientModule>() {
546        let _ = ln_client.update_gateway_cache().await;
547    }
548    Ok((client, invite_code))
549}
550
551async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> {
552    info!("Note summary:");
553    let summary = get_note_summary(coordinator).await?;
554    for (k, v) in summary.iter() {
555        info!("{k}: {v}");
556    }
557    Ok(())
558}
559
560async fn get_required_notes(
561    coordinator: &ClientHandleArc,
562    minimum_amount_required: Amount,
563    event_sender: &mpsc::UnboundedSender<MetricEvent>,
564) -> anyhow::Result<()> {
565    let current_balance = coordinator.get_balance_for_btc().await?;
566
567    if current_balance < minimum_amount_required {
568        let diff = minimum_amount_required.saturating_sub(current_balance);
569        info!(
570            "Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"
571        );
572        match try_get_notes_cli(&diff, 5).await {
573            Ok(notes) => {
574                info!("Got {} more notes, reissuing them", notes.total_amount());
575                reissue_notes(coordinator, notes, event_sender).await?;
576            }
577            Err(e) => {
578                info!("Unable to get more notes: '{e}', will try to proceed without them");
579            }
580        }
581    } else {
582        info!(
583            "Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"
584        );
585    }
586    Ok(())
587}
588
589async fn reissue_initial_notes(
590    initial_notes: Option<OOBNotes>,
591    coordinator: &ClientHandleArc,
592    event_sender: &mpsc::UnboundedSender<MetricEvent>,
593) -> anyhow::Result<()> {
594    if let Some(notes) = initial_notes {
595        let amount = notes.total_amount();
596        info!("Reissuing initial notes, got {amount}");
597        reissue_notes(coordinator, notes, event_sender).await?;
598    }
599    Ok(())
600}
601
602async fn get_coordinator_client(
603    db_path: &Option<PathBuf>,
604    invite_code: &Option<InviteCode>,
605) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
606    let (client, invite_code) = if let Some(db_path) = db_path {
607        let coordinator_db = db_path.join("coordinator.db");
608        if coordinator_db.exists() {
609            build_client(invite_code.clone(), Some(&coordinator_db)).await?
610        } else {
611            tokio::fs::create_dir_all(db_path).await?;
612            build_client(
613                Some(invite_code.clone().context(
614                    "Running on this archive dir for the first time, an invite code is required",
615                )?),
616                Some(&coordinator_db),
617            )
618            .await?
619        }
620    } else {
621        build_client(
622            Some(
623                invite_code
624                    .clone()
625                    .context("No archive dir given, an invite code is strictly required")?,
626            ),
627            None,
628        )
629        .await?
630    };
631    Ok((client, invite_code))
632}
633
634fn get_db_path(archive_dir: &Option<PathBuf>) -> Option<PathBuf> {
635    archive_dir.as_ref().map(|p| p.join("db"))
636}
637
638async fn get_lightning_gateway(
639    client: &ClientHandleArc,
640    gateway_id: Option<String>,
641) -> Option<LightningGateway> {
642    let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id");
643    let ln_module = client
644        .get_first_module::<LightningClientModule>()
645        .expect("Must have ln client module");
646    ln_module.select_gateway(&gateway_id).await
647}
648
649#[allow(clippy::too_many_arguments)]
650async fn do_load_test_user_task(
651    prefix: String,
652    client: ClientHandleArc,
653    oob_notes: Vec<OOBNotes>,
654    generated_invoices_per_user: u16,
655    ln_payment_sleep: Duration,
656    invoice_amount: Amount,
657    additional_invoices: Vec<Bolt11Invoice>,
658    generate_invoice_with: Option<LnInvoiceGeneration>,
659    event_sender: mpsc::UnboundedSender<MetricEvent>,
660    gateway_id: Option<String>,
661) -> anyhow::Result<()> {
662    let ln_gateway = get_lightning_gateway(&client, gateway_id).await;
663    for oob_note in oob_notes {
664        let amount = oob_note.total_amount();
665        reissue_notes(&client, oob_note, &event_sender)
666            .await
667            .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
668    }
669    let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable();
670    while let Some(_) = generated_invoices_per_user_iterator.next() {
671        let total_amount = get_note_summary(&client).await?.total_amount();
672        if invoice_amount > total_amount {
673            warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
674        } else {
675            match generate_invoice_with {
676                Some(LnInvoiceGeneration::LdkLightningCli) => {
677                    let invoice = ldk_create_invoice(invoice_amount).await?;
678                    gateway_pay_invoice(
679                        &prefix,
680                        "LND",
681                        &client,
682                        invoice.clone(),
683                        &event_sender,
684                        ln_gateway.clone(),
685                    )
686                    .await?;
687                    ldk_wait_invoice_payment(&invoice).await?;
688                }
689                None if additional_invoices.is_empty() => {
690                    debug!(
691                        "No method given to generate an invoice and no invoices on file, will not test the gateway"
692                    );
693                    break;
694                }
695                None => {
696                    break;
697                }
698            }
699            if generated_invoices_per_user_iterator.peek().is_some() {
700                // Only sleep while there are more invoices to pay
701                fedimint_core::task::sleep(ln_payment_sleep).await;
702            }
703        }
704    }
705    let mut additional_invoices = additional_invoices.into_iter().peekable();
706    while let Some(invoice) = additional_invoices.next() {
707        let total_amount = get_note_summary(&client).await?.total_amount();
708        let invoice_amount =
709            Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default());
710        if invoice_amount > total_amount {
711            warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
712        } else if invoice_amount == Amount::ZERO {
713            warn!("Can't pay invoice {invoice}, amount is zero");
714        } else {
715            gateway_pay_invoice(
716                &prefix,
717                "unknown",
718                &client,
719                invoice,
720                &event_sender,
721                ln_gateway.clone(),
722            )
723            .await?;
724            if additional_invoices.peek().is_some() {
725                // Only sleep while there are more invoices to pay
726                fedimint_core::task::sleep(ln_payment_sleep).await;
727            }
728        }
729    }
730    Ok(())
731}
732
733#[allow(clippy::too_many_arguments)]
734async fn run_ln_circular_load_test(
735    archive_dir: Option<PathBuf>,
736    users: u16,
737    invite_code: Option<InviteCode>,
738    initial_notes: Option<OOBNotes>,
739    test_duration: Duration,
740    ln_payment_sleep: Duration,
741    notes_per_user: u16,
742    note_denomination: Amount,
743    invoice_amount: Amount,
744    strategy: LnCircularStrategy,
745    event_sender: mpsc::UnboundedSender<MetricEvent>,
746) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
747    let db_path = get_db_path(&archive_dir);
748    let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
749    let minimum_notes = notes_per_user * users;
750    let minimum_amount_required = note_denomination * u64::from(minimum_notes);
751
752    reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
753    get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
754
755    info!(
756        "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
757    );
758    remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
759
760    print_coordinator_notes(&coordinator).await?;
761
762    let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?;
763
764    let mut users_notes =
765        get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
766
767    info!("Starting user tasks");
768    let futures = users_clients
769        .into_iter()
770        .enumerate()
771        .map(|(u, client)| {
772            let u = u as u16;
773            let oob_notes = users_notes.remove(&u).unwrap();
774            let event_sender = event_sender.clone();
775            let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task(
776                format!("User {u}:"),
777                client,
778                invite_code.clone(),
779                oob_notes,
780                test_duration,
781                ln_payment_sleep,
782                invoice_amount,
783                strategy,
784                event_sender,
785            ));
786            f
787        })
788        .collect::<Vec<_>>();
789
790    Ok(futures)
791}
792
793#[allow(clippy::too_many_arguments)]
794async fn do_ln_circular_test_user_task(
795    prefix: String,
796    client: ClientHandleArc,
797    invite_code: Option<InviteCode>,
798    oob_notes: Vec<OOBNotes>,
799    test_duration: Duration,
800    ln_payment_sleep: Duration,
801    invoice_amount: Amount,
802    strategy: LnCircularStrategy,
803    event_sender: mpsc::UnboundedSender<MetricEvent>,
804) -> anyhow::Result<()> {
805    for oob_note in oob_notes {
806        let amount = oob_note.total_amount();
807        reissue_notes(&client, oob_note, &event_sender)
808            .await
809            .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
810    }
811    let initial_time = fedimint_core::time::now();
812    let still_ontime = || async {
813        fedimint_core::time::now()
814            .duration_since(initial_time)
815            .expect("time to work")
816            <= test_duration
817    };
818    let sleep_a_bit = || async {
819        if still_ontime().await {
820            fedimint_core::task::sleep(ln_payment_sleep).await;
821        }
822    };
823    match strategy {
824        LnCircularStrategy::TwoGateways => {
825            let invoice_generation = LnInvoiceGeneration::LdkLightningCli;
826            while still_ontime().await {
827                let gateway_id = get_gateway_id(invoice_generation).await?;
828                let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await;
829                run_two_gateways_strategy(
830                    &prefix,
831                    &invoice_generation,
832                    &invoice_amount,
833                    &event_sender,
834                    &client,
835                    ln_gateway,
836                )
837                .await?;
838                sleep_a_bit().await;
839            }
840        }
841        LnCircularStrategy::SelfPayment => {
842            while still_ontime().await {
843                do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?;
844                sleep_a_bit().await;
845            }
846        }
847        LnCircularStrategy::PartnerPingPong => {
848            let (partner, _) = build_client(invite_code, None).await?;
849            while still_ontime().await {
850                do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender)
851                    .await?;
852                sleep_a_bit().await;
853            }
854        }
855    }
856    Ok(())
857}
858
859const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice";
860
861async fn run_two_gateways_strategy(
862    prefix: &str,
863    invoice_generation: &LnInvoiceGeneration,
864    invoice_amount: &Amount,
865    event_sender: &mpsc::UnboundedSender<MetricEvent>,
866    client: &ClientHandleArc,
867    ln_gateway: Option<LightningGateway>,
868) -> Result<(), anyhow::Error> {
869    let create_invoice_time = fedimint_core::time::now();
870    match *invoice_generation {
871        LnInvoiceGeneration::LdkLightningCli => {
872            let invoice = ldk_create_invoice(*invoice_amount).await?;
873            let elapsed = create_invoice_time.elapsed()?;
874            info!("Created invoice using LDK in {elapsed:?}");
875            event_sender.send(MetricEvent {
876                name: GATEWAY_CREATE_INVOICE.into(),
877                duration: elapsed,
878            })?;
879            gateway_pay_invoice(
880                prefix,
881                "LND",
882                client,
883                invoice.clone(),
884                event_sender,
885                ln_gateway.clone(),
886            )
887            .await?;
888            ldk_wait_invoice_payment(&invoice).await?;
889            let (operation_id, invoice) =
890                client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?;
891            let pay_invoice_time = fedimint_core::time::now();
892            ldk_pay_invoice(invoice).await?;
893            wait_invoice_payment(
894                prefix,
895                "LND",
896                client,
897                operation_id,
898                event_sender,
899                pay_invoice_time,
900            )
901            .await?;
902        }
903    }
904    Ok(())
905}
906
907async fn do_self_payment(
908    prefix: &str,
909    client: &ClientHandleArc,
910    invoice_amount: Amount,
911    event_sender: &mpsc::UnboundedSender<MetricEvent>,
912) -> anyhow::Result<()> {
913    let (operation_id, invoice) =
914        client_create_invoice(client, invoice_amount, event_sender, None).await?;
915    let pay_invoice_time = fedimint_core::time::now();
916    let lightning_module = client.get_first_module::<LightningClientModule>()?;
917    //let gateway = lightning_module.select_active_gateway_opt().await;
918    lightning_module
919        .pay_bolt11_invoice(None, invoice, ())
920        .await?;
921    wait_invoice_payment(
922        prefix,
923        "gateway",
924        client,
925        operation_id,
926        event_sender,
927        pay_invoice_time,
928    )
929    .await?;
930    Ok(())
931}
932
933async fn do_partner_ping_pong(
934    prefix: &str,
935    client: &ClientHandleArc,
936    partner: &ClientHandleArc,
937    invoice_amount: Amount,
938    event_sender: &mpsc::UnboundedSender<MetricEvent>,
939) -> anyhow::Result<()> {
940    // Ping (partner creates invoice, client pays)
941    let (operation_id, invoice) =
942        client_create_invoice(partner, invoice_amount, event_sender, None).await?;
943    let pay_invoice_time = fedimint_core::time::now();
944    let lightning_module = client.get_first_module::<LightningClientModule>()?;
945    // TODO: Select random gateway?
946    //let gateway = lightning_module.select_active_gateway_opt().await;
947    lightning_module
948        .pay_bolt11_invoice(None, invoice, ())
949        .await?;
950    wait_invoice_payment(
951        prefix,
952        "gateway",
953        partner,
954        operation_id,
955        event_sender,
956        pay_invoice_time,
957    )
958    .await?;
959    // Pong (client creates invoice, partner pays)
960    let (operation_id, invoice) =
961        client_create_invoice(client, invoice_amount, event_sender, None).await?;
962    let pay_invoice_time = fedimint_core::time::now();
963    let partner_lightning_module = partner.get_first_module::<LightningClientModule>()?;
964    //let gateway = partner_lightning_module.select_active_gateway_opt().await;
965    // TODO: Select random gateway?
966    partner_lightning_module
967        .pay_bolt11_invoice(None, invoice, ())
968        .await?;
969    wait_invoice_payment(
970        prefix,
971        "gateway",
972        client,
973        operation_id,
974        event_sender,
975        pay_invoice_time,
976    )
977    .await?;
978    Ok(())
979}
980
981async fn wait_invoice_payment(
982    prefix: &str,
983    gateway_name: &str,
984    client: &ClientHandleArc,
985    operation_id: fedimint_core::core::OperationId,
986    event_sender: &mpsc::UnboundedSender<MetricEvent>,
987    pay_invoice_time: std::time::SystemTime,
988) -> anyhow::Result<()> {
989    let elapsed = pay_invoice_time.elapsed()?;
990    info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}");
991    event_sender.send(MetricEvent {
992        name: format!("gateway_{gateway_name}_payment_received_started"),
993        duration: elapsed,
994    })?;
995    let lightning_module = client.get_first_module::<LightningClientModule>()?;
996    let mut updates = lightning_module
997        .subscribe_ln_receive(operation_id)
998        .await?
999        .into_stream();
1000    while let Some(update) = updates.next().await {
1001        debug!(%prefix, ?update, "Invoice payment update");
1002        match update {
1003            LnReceiveState::Claimed => {
1004                let elapsed: Duration = pay_invoice_time.elapsed()?;
1005                info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}");
1006                event_sender.send(MetricEvent {
1007                    name: "gateway_payment_received_success".into(),
1008                    duration: elapsed,
1009                })?;
1010                event_sender.send(MetricEvent {
1011                    name: format!("gateway_{gateway_name}_payment_received_success"),
1012                    duration: elapsed,
1013                })?;
1014                break;
1015            }
1016            LnReceiveState::Canceled { reason } => {
1017                let elapsed: Duration = pay_invoice_time.elapsed()?;
1018                info!(
1019                    "{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"
1020                );
1021                event_sender.send(MetricEvent {
1022                    name: "gateway_payment_received_canceled".into(),
1023                    duration: elapsed,
1024                })?;
1025                break;
1026            }
1027            _ => {}
1028        }
1029    }
1030    Ok(())
1031}
1032
1033async fn client_create_invoice(
1034    client: &ClientHandleArc,
1035    invoice_amount: Amount,
1036    event_sender: &mpsc::UnboundedSender<MetricEvent>,
1037    ln_gateway: Option<LightningGateway>,
1038) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> {
1039    let create_invoice_time = fedimint_core::time::now();
1040    let lightning_module = client.get_first_module::<LightningClientModule>()?;
1041    let desc = Description::new("test".to_string())?;
1042    let (operation_id, invoice, _) = lightning_module
1043        .create_bolt11_invoice(
1044            invoice_amount,
1045            Bolt11InvoiceDescription::Direct(desc),
1046            None,
1047            (),
1048            ln_gateway,
1049        )
1050        .await?;
1051    let elapsed = create_invoice_time.elapsed()?;
1052    info!("Created invoice using gateway in {elapsed:?}");
1053    event_sender.send(MetricEvent {
1054        name: GATEWAY_CREATE_INVOICE.into(),
1055        duration: elapsed,
1056    })?;
1057    Ok((operation_id, invoice))
1058}
1059
1060fn test_download_config(
1061    invite_code: &InviteCode,
1062    users: u16,
1063    event_sender: &mpsc::UnboundedSender<MetricEvent>,
1064) -> Vec<BoxFuture<'static, anyhow::Result<()>>> {
1065    (0..users)
1066        .map(|_| {
1067            let invite_code = invite_code.clone();
1068            let event_sender = event_sender.clone();
1069            let f: BoxFuture<_> = Box::pin(async move {
1070                let m = fedimint_core::time::now();
1071                let _ = fedimint_api_client::api::net::ConnectorType::default()
1072                    .download_from_invite_code(&invite_code, false, false)
1073                    .await?;
1074                event_sender.send(MetricEvent {
1075                    name: "download_client_config".into(),
1076                    duration: m.elapsed()?,
1077                })?;
1078                Ok(())
1079            });
1080            f
1081        })
1082        .collect()
1083}
1084
1085async fn test_connect_raw_client(
1086    invite_code: InviteCode,
1087    users: u16,
1088    duration: Duration,
1089    timeout: Duration,
1090    limit_endpoints: Option<usize>,
1091    event_sender: mpsc::UnboundedSender<MetricEvent>,
1092) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
1093    use jsonrpsee_core::client::ClientT;
1094    use jsonrpsee_ws_client::WsClientBuilder;
1095
1096    let (mut cfg, _) = fedimint_api_client::api::net::ConnectorType::default()
1097        .download_from_invite_code(&invite_code, false, false)
1098        .await?;
1099
1100    if let Some(limit_endpoints) = limit_endpoints {
1101        cfg.global.api_endpoints = cfg
1102            .global
1103            .api_endpoints
1104            .into_iter()
1105            .take(limit_endpoints)
1106            .collect();
1107        info!("Limiting endpoints to {:?}", cfg.global.api_endpoints);
1108    }
1109
1110    info!("Connecting to {users} clients");
1111    let clients = (0..users)
1112        .flat_map(|_| {
1113            cfg.global.api_endpoints.values().map(|url| async {
1114                let ws_client = WsClientBuilder::default()
1115                    .request_timeout(timeout)
1116                    .connection_timeout(timeout)
1117                    .build(url_to_string_with_default_port(&url.url))
1118                    .await?;
1119                Ok::<_, anyhow::Error>(ws_client)
1120            })
1121        })
1122        .collect::<Vec<_>>();
1123    let clients = futures::future::try_join_all(clients).await?;
1124    info!("Keeping {users} clients connected for {duration:?}");
1125    Ok(clients
1126        .into_iter()
1127        .map(|client| {
1128            let event_sender = event_sender.clone();
1129            let f: BoxFuture<_> = Box::pin(async move {
1130                let initial_time = fedimint_core::time::now();
1131                while initial_time.elapsed()? < duration {
1132                    let m = fedimint_core::time::now();
1133                    let _epoch: u64 = client
1134                        .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()])
1135                        .await?;
1136                    event_sender.send(MetricEvent {
1137                        name: SESSION_COUNT_ENDPOINT.into(),
1138                        duration: m.elapsed()?,
1139                    })?;
1140                    fedimint_core::task::sleep(Duration::from_secs(1)).await;
1141                }
1142                Ok(())
1143            });
1144            f
1145        })
1146        .collect())
1147}
1148
1149fn url_to_string_with_default_port(url: &SafeUrl) -> String {
1150    format!(
1151        "{}://{}:{}{}",
1152        url.scheme(),
1153        url.host().expect("Asserted on construction"),
1154        url.port_or_known_default()
1155            .expect("Asserted on construction"),
1156        url.path()
1157    )
1158}
1159
1160async fn handle_metrics_summary(
1161    opts: Opts,
1162    mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>,
1163) -> anyhow::Result<()> {
1164    let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs();
1165    let mut metrics_json_output_files = vec![];
1166    let mut previous_metrics = vec![];
1167    let mut comparison_output = None;
1168    if let Some(archive_dir) = opts.archive_dir {
1169        let mut archive_metrics = archive_dir.join("metrics");
1170        archive_metrics.push(opts.users.to_string());
1171        tokio::fs::create_dir_all(&archive_metrics).await?;
1172        let mut archive_comparisons = archive_dir.join("comparisons");
1173        archive_comparisons.push(opts.users.to_string());
1174        tokio::fs::create_dir_all(&archive_comparisons).await?;
1175
1176        let latest_metrics_file = std::fs::read_dir(&archive_metrics)?
1177            .map(|entry| {
1178                let entry = entry.unwrap();
1179                let metadata = entry.metadata().unwrap();
1180                let created = metadata
1181                    .created()
1182                    .unwrap_or_else(|_| metadata.modified().unwrap());
1183                (entry, created)
1184            })
1185            .max_by_key(|(_entry, created)| created.to_owned())
1186            .map(|(entry, _)| entry.path());
1187        if let Some(latest_metrics_file) = latest_metrics_file {
1188            let display_path = latest_metrics_file.display();
1189            let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file)
1190                .await
1191                .with_context(|| format!("Failed to open {display_path}"))?;
1192            let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines();
1193            while let Some(line) = lines.next_line().await? {
1194                match serde_json::from_str::<EventMetricSummary>(&line) {
1195                    Ok(metric) => {
1196                        previous_metrics.push(metric);
1197                    }
1198                    Err(e) => {
1199                        warn!("Failed to parse previous metric: {e:?}");
1200                    }
1201                }
1202            }
1203        }
1204        let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",));
1205        let new_metric_output = BufWriter::new(
1206            OpenOptions::new()
1207                .write(true)
1208                .create(true)
1209                .truncate(true)
1210                .open(new_metric_output)
1211                .await?,
1212        );
1213        metrics_json_output_files.push(new_metric_output);
1214        if !previous_metrics.is_empty() {
1215            let new_comparison_output =
1216                archive_comparisons.join(format!("{timestamp_seconds}.json",));
1217            comparison_output = Some(BufWriter::new(
1218                OpenOptions::new()
1219                    .write(true)
1220                    .create(true)
1221                    .truncate(true)
1222                    .open(new_comparison_output)
1223                    .await?,
1224            ));
1225        }
1226    }
1227    if let Some(metrics_json_output) = opts.metrics_json_output {
1228        metrics_json_output_files.push(BufWriter::new(
1229            tokio::fs::OpenOptions::new()
1230                .write(true)
1231                .create(true)
1232                .truncate(true)
1233                .open(metrics_json_output)
1234                .await?,
1235        ));
1236    }
1237    let mut results = BTreeMap::new();
1238    while let Some(event) = event_receiver.recv().await {
1239        let entry = results.entry(event.name).or_insert_with(Vec::new);
1240        entry.push(event.duration);
1241    }
1242    let mut previous_metrics = previous_metrics
1243        .into_iter()
1244        .map(|metric| (metric.name.clone(), metric))
1245        .collect::<HashMap<_, _>>();
1246    for (k, mut v) in results {
1247        v.sort();
1248        let n = v.len();
1249        let max = v.iter().last().unwrap();
1250        let min = v.first().unwrap();
1251        let median = v[n / 2];
1252        let sum: Duration = v.iter().sum();
1253        let avg = sum / n as u32;
1254        let metric_summary = EventMetricSummary {
1255            name: k.clone(),
1256            users: u64::from(opts.users),
1257            n: n as u64,
1258            avg_ms: avg.as_millis(),
1259            median_ms: median.as_millis(),
1260            max_ms: max.as_millis(),
1261            min_ms: min.as_millis(),
1262            timestamp_seconds,
1263        };
1264        let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) {
1265            if previous_metric.n == metric_summary.n {
1266                fn calculate_gain(current: u128, previous: u128) -> f64 {
1267                    current as f64 / previous as f64
1268                }
1269                let comparison = EventMetricComparison {
1270                    avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms),
1271                    median_ms_gain: calculate_gain(
1272                        metric_summary.median_ms,
1273                        previous_metric.median_ms,
1274                    ),
1275                    max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms),
1276                    min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms),
1277                    current: metric_summary.clone(),
1278                    previous: previous_metric,
1279                };
1280                if let Some(comparison_output) = &mut comparison_output {
1281                    let comparison_json =
1282                        serde_json::to_string(&comparison).expect("to be serializable");
1283                    comparison_output
1284                        .write_all(format!("{comparison_json}\n").as_bytes())
1285                        .await
1286                        .expect("to write on file");
1287                }
1288                Some(comparison)
1289            } else {
1290                info!(
1291                    "Skipping comparison for {k} because previous metric has different n ({} vs {})",
1292                    previous_metric.n, metric_summary.n
1293                );
1294                None
1295            }
1296        } else {
1297            None
1298        };
1299        if let Some(comparison) = comparison {
1300            println!(
1301                "{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"
1302            );
1303        } else {
1304            println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}");
1305        }
1306        let metric_summary_json =
1307            serde_json::to_string(&metric_summary).expect("to be serializable");
1308        for metrics_json_output_file in &mut metrics_json_output_files {
1309            metrics_json_output_file
1310                .write_all(format!("{metric_summary_json}\n").as_bytes())
1311                .await
1312                .expect("to write on file");
1313        }
1314    }
1315    for mut output in metrics_json_output_files {
1316        output.flush().await?;
1317    }
1318    if let Some(mut output) = comparison_output {
1319        output.flush().await?;
1320    }
1321    Ok(())
1322}
1323
1324async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> {
1325    let gateway_json = match generate_invoice_with {
1326        LnInvoiceGeneration::LdkLightningCli => {
1327            // If we are paying a lnd invoice, we use the LDK node
1328            cmd!(GatewayLndCli, "info").out_json().await
1329        }
1330    }?;
1331    let gateway_id = gateway_json["gateway_id"]
1332        .as_str()
1333        .context("Missing gateway_id field")?;
1334
1335    Ok(gateway_id.into())
1336}