fedimint_load_test_tool/
main.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_precision_loss)]
4#![allow(clippy::missing_errors_doc)]
5#![allow(clippy::ref_option)]
6#![allow(clippy::too_many_lines)]
7#![allow(clippy::large_futures)]
8
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::time::Duration;
13use std::vec;
14
15use anyhow::{Context, anyhow, bail};
16use clap::{Args, Parser, Subcommand, ValueEnum};
17use common::{
18    gateway_pay_invoice, get_note_summary, ldk_create_invoice, ldk_pay_invoice,
19    ldk_wait_invoice_payment, parse_gateway_id, reissue_notes,
20};
21use devimint::cmd;
22use devimint::util::GatewayLndCli;
23use devimint::version_constants::VERSION_0_10_0_ALPHA;
24use fedimint_api_client::download_from_invite_code;
25use fedimint_client::ClientHandleArc;
26use fedimint_connectors::ConnectorRegistry;
27use fedimint_core::Amount;
28use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
29use fedimint_core::invite_code::InviteCode;
30use fedimint_core::module::ApiRequestErased;
31use fedimint_core::runtime::spawn;
32use fedimint_core::util::{BoxFuture, SafeUrl};
33use fedimint_ln_client::{LightningClientModule, LnReceiveState};
34use fedimint_ln_common::LightningGateway;
35use fedimint_mint_client::OOBNotes;
36use futures::StreamExt;
37use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
38use serde::{Deserialize, Serialize};
39use tokio::fs::OpenOptions;
40use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
41use tokio::sync::mpsc;
42use tracing::{debug, info, warn};
43
44use crate::common::{
45    build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli,
46};
47pub mod common;
48
49#[derive(Parser, Clone)]
50#[command(version)]
51struct Opts {
52    #[arg(
53        long,
54        default_value = "10",
55        help = "Number of users. Each user will work in parallel"
56    )]
57    users: u16,
58
59    #[arg(long, help = "Output with the metrics results in JSON format")]
60    metrics_json_output: Option<PathBuf>,
61
62    #[arg(
63        long,
64        help = "If given, will be used to store and retrieve past metrics for comparison purposes"
65    )]
66    archive_dir: Option<PathBuf>,
67
68    #[clap(subcommand)]
69    command: Command,
70}
71
72#[derive(Debug, Clone, Copy, ValueEnum)]
73enum LnInvoiceGeneration {
74    LdkLightningCli,
75}
76
77#[derive(Subcommand, Clone)]
78enum Command {
79    #[command(about = "Keep many websocket connections to a federation for a duration of time")]
80    TestConnect {
81        #[arg(long, help = "Federation invite code")]
82        invite_code: String,
83        #[arg(
84            long,
85            default_value = "60",
86            help = "How much time to keep the connections open, in seconds"
87        )]
88        duration_secs: u64,
89        #[arg(
90            long,
91            default_value = "120",
92            help = "Timeout for connection attempt and for each request, in secnods"
93        )]
94        timeout_secs: u64,
95        #[arg(
96            long,
97            help = "If given, will limit the number of endpoints (guardians) to connect to"
98        )]
99        limit_endpoints: Option<usize>,
100    },
101    #[command(about = "Try to download the client config many times.")]
102    TestDownload {
103        #[arg(long, help = "Federation invite code")]
104        invite_code: String,
105    },
106    #[command(
107        about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway"
108    )]
109    LoadTest(LoadTestArgs),
110    /// Run a load test where many users in parallel will receive then send a
111    /// payment through lightning.
112    /// It's 'circular' because the funds always come back to the same user then
113    /// we can keep making the payments in a loop
114    #[command()]
115    LnCircularLoadTest(LnCircularLoadTestArgs),
116}
117
118#[derive(Args, Clone)]
119struct LoadTestArgs {
120    #[arg(
121        long,
122        help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
123    )]
124    invite_code: Option<InviteCode>,
125
126    #[arg(
127        long,
128        help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
129    )]
130    initial_notes: Option<OOBNotes>,
131
132    #[arg(
133        long,
134        help = "Gateway Id. If none, retrieve one according to --generate-invoice-with"
135    )]
136    gateway_id: Option<String>,
137
138    #[arg(
139        long,
140        help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)"
141    )]
142    generate_invoice_with: Option<LnInvoiceGeneration>,
143
144    #[arg(
145        long,
146        default_value = "1",
147        help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided"
148    )]
149    invoices_per_user: u16,
150
151    #[arg(
152        long,
153        default_value = "0",
154        help = "How many seconds to sleep between LN payments"
155    )]
156    ln_payment_sleep_secs: u64,
157
158    #[arg(
159        long,
160        help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid"
161    )]
162    invoices_file: Option<PathBuf>,
163
164    #[arg(
165        long,
166        help = "How many notes to distribute to each user",
167        default_value = "2"
168    )]
169    notes_per_user: u16,
170
171    #[arg(
172        long,
173        help = "Note denomination to use for the test",
174        default_value = "1024"
175    )]
176    note_denomination: Amount,
177
178    #[arg(
179        long,
180        help = "Invoice amount when generating one",
181        default_value = "1000"
182    )]
183    invoice_amount: Amount,
184}
185
186#[derive(Args, Clone)]
187struct LnCircularLoadTestArgs {
188    #[arg(
189        long,
190        help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
191    )]
192    invite_code: Option<InviteCode>,
193
194    #[arg(
195        long,
196        help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
197    )]
198    initial_notes: Option<OOBNotes>,
199
200    #[arg(
201        long,
202        default_value = "60",
203        help = "For how many seconds to run the test"
204    )]
205    test_duration_secs: u64,
206
207    #[arg(
208        long,
209        default_value = "0",
210        help = "How many seconds to sleep between LN payments"
211    )]
212    ln_payment_sleep_secs: u64,
213
214    #[arg(
215        long,
216        help = "How many notes to distribute to each user",
217        default_value = "1"
218    )]
219    notes_per_user: u16,
220
221    #[arg(
222        long,
223        help = "Note denomination to use for the test",
224        default_value = "1024"
225    )]
226    note_denomination: Amount,
227
228    #[arg(
229        long,
230        help = "Invoice amount when generating one",
231        default_value = "1000"
232    )]
233    invoice_amount: Amount,
234
235    #[arg(long)]
236    strategy: LnCircularStrategy,
237}
238
239#[derive(Debug, Clone, Copy, ValueEnum)]
240enum LnCircularStrategy {
241    /// The user will pay its own invoice
242    SelfPayment,
243    /// One gateway will pay/receive to/from the other, then they will swap
244    /// places
245    TwoGateways,
246    /// Two clients will pay to each other using the same gateway
247    PartnerPingPong,
248}
249
250#[derive(Debug, Clone)]
251pub struct MetricEvent {
252    name: String,
253    duration: Duration,
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize)]
257struct EventMetricSummary {
258    name: String,
259    users: u64,
260    n: u64,
261    avg_ms: u128,
262    median_ms: u128,
263    max_ms: u128,
264    min_ms: u128,
265    timestamp_seconds: u64,
266}
267
268#[derive(Debug, Serialize, Deserialize)]
269struct EventMetricComparison {
270    avg_ms_gain: f64,
271    median_ms_gain: f64,
272    max_ms_gain: f64,
273    min_ms_gain: f64,
274    current: EventMetricSummary,
275    previous: EventMetricSummary,
276}
277
278impl std::fmt::Display for EventMetricComparison {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        fn to_percent(gain: f64) -> String {
281            if gain >= 1.0 {
282                format!("+{:.2}%", (gain - 1.0) * 100.0)
283            } else {
284                format!("-{:.2}%", (1.0 - gain) * 100.0)
285            }
286        }
287        f.write_str(&format!(
288            "avg: {}, median: {}, max: {}, min: {}",
289            to_percent(self.avg_ms_gain),
290            to_percent(self.median_ms_gain),
291            to_percent(self.max_ms_gain),
292            to_percent(self.min_ms_gain),
293        ))
294    }
295}
296
297#[tokio::main]
298async fn main() -> anyhow::Result<()> {
299    fedimint_logging::TracingSetup::default().init()?;
300    let opts = Opts::parse();
301    let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
302    let summary_handle = spawn("handle metrics summary", {
303        let opts = opts.clone();
304        async { handle_metrics_summary(opts, event_receiver).await }
305    });
306    let futures = match opts.command.clone() {
307        Command::TestConnect {
308            invite_code,
309            duration_secs,
310            timeout_secs,
311            limit_endpoints,
312        } => {
313            let connectors = ConnectorRegistry::build_from_client_env()?.bind().await?;
314            let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
315            test_connect_raw_client(
316                &connectors,
317                invite_code,
318                opts.users,
319                Duration::from_secs(duration_secs),
320                Duration::from_secs(timeout_secs),
321                limit_endpoints,
322                event_sender.clone(),
323            )
324            .await?
325        }
326        Command::TestDownload { invite_code } => {
327            let connectors = ConnectorRegistry::build_from_client_env()?.bind().await?;
328            let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
329            test_download_config(&connectors, &invite_code, opts.users, &event_sender.clone())
330        }
331        Command::LoadTest(args) => {
332            let invite_code = invite_code_or_fallback(args.invite_code).await;
333
334            let gateway_id = if let Some(gateway_id) = args.gateway_id {
335                Some(gateway_id)
336            } else if let Some(generate_invoice_with) = args.generate_invoice_with {
337                Some(get_gateway_id(generate_invoice_with).await?)
338            } else {
339                None
340            };
341            let invoices = if let Some(invoices_file) = args.invoices_file {
342                let display_path = invoices_file.display();
343                let invoices_file = tokio::fs::File::open(&invoices_file)
344                    .await
345                    .with_context(|| format!("Failed to open {display_path}"))?;
346                let mut lines = tokio::io::BufReader::new(invoices_file).lines();
347                let mut invoices = vec![];
348                while let Some(line) = lines.next_line().await? {
349                    let invoice = Bolt11Invoice::from_str(&line)?;
350                    invoices.push(invoice);
351                }
352                invoices
353            } else {
354                vec![]
355            };
356            if args.generate_invoice_with.is_none() && invoices.is_empty() {
357                info!(
358                    "No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run"
359                );
360            }
361            run_load_test(
362                opts.archive_dir,
363                opts.users,
364                invite_code,
365                args.initial_notes,
366                args.generate_invoice_with,
367                args.invoices_per_user,
368                Duration::from_secs(args.ln_payment_sleep_secs),
369                invoices,
370                gateway_id,
371                args.notes_per_user,
372                args.note_denomination,
373                args.invoice_amount,
374                event_sender.clone(),
375            )
376            .await?
377        }
378        Command::LnCircularLoadTest(args) => {
379            let invite_code = invite_code_or_fallback(args.invite_code).await;
380            run_ln_circular_load_test(
381                opts.archive_dir,
382                opts.users,
383                invite_code,
384                args.initial_notes,
385                Duration::from_secs(args.test_duration_secs),
386                Duration::from_secs(args.ln_payment_sleep_secs),
387                args.notes_per_user,
388                args.note_denomination,
389                args.invoice_amount,
390                args.strategy,
391                event_sender.clone(),
392            )
393            .await?
394        }
395    };
396
397    let result = futures::future::join_all(futures).await;
398    drop(event_sender);
399    summary_handle.await??;
400    let len_failures = result.iter().filter(|r| r.is_err()).count();
401    eprintln!("{} results, {len_failures} failures", result.len());
402    for r in result {
403        if let Err(e) = r {
404            warn!("Task failed: {:?}", e);
405        }
406    }
407    if len_failures > 0 {
408        bail!("Finished with failures");
409    }
410    info!("Finished successfully");
411    Ok(())
412}
413
414async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> {
415    if let Some(invite_code) = invite_code {
416        Some(invite_code)
417    } else {
418        // Try to get an invite code through cli in a best effort basis
419        match get_invite_code_cli(0.into()).await {
420            Ok(invite_code) => Some(invite_code),
421            Err(e) => {
422                info!(
423                    "No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."
424                );
425                None
426            }
427        }
428    }
429}
430
431#[allow(clippy::too_many_arguments)]
432async fn run_load_test(
433    archive_dir: Option<PathBuf>,
434    users: u16,
435    invite_code: Option<InviteCode>,
436    initial_notes: Option<OOBNotes>,
437    generate_invoice_with: Option<LnInvoiceGeneration>,
438    generated_invoices_per_user: u16,
439    ln_payment_sleep: Duration,
440    invoices_from_file: Vec<Bolt11Invoice>,
441    gateway_id: Option<String>,
442    notes_per_user: u16,
443    note_denomination: Amount,
444    invoice_amount: Amount,
445    event_sender: mpsc::UnboundedSender<MetricEvent>,
446) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
447    let db_path = get_db_path(&archive_dir);
448    let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
449    let minimum_notes = notes_per_user * users;
450    let minimum_amount_required = note_denomination * u64::from(minimum_notes);
451
452    reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
453    get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
454    print_coordinator_notes(&coordinator).await?;
455    info!(
456        "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
457    );
458    remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
459    print_coordinator_notes(&coordinator).await?;
460
461    let users_clients = get_users_clients(users, db_path, invite_code).await?;
462
463    let mut users_notes =
464        get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
465    let mut users_invoices = HashMap::new();
466    let mut user = 0;
467    // Distribute invoices to users in a round robin fashion
468    for invoice in invoices_from_file {
469        users_invoices
470            .entry(user)
471            .or_insert_with(Vec::new)
472            .push(invoice);
473        user = (user + 1) % users;
474    }
475
476    info!("Starting user tasks");
477    let futures = users_clients
478        .into_iter()
479        .enumerate()
480        .map(|(u, client)| {
481            let u = u as u16;
482            let oob_notes = users_notes.remove(&u).unwrap();
483            let invoices = users_invoices.remove(&u).unwrap_or_default();
484            let event_sender = event_sender.clone();
485            let f: BoxFuture<_> = Box::pin(do_load_test_user_task(
486                format!("User {u}:"),
487                client,
488                oob_notes,
489                generated_invoices_per_user,
490                ln_payment_sleep,
491                invoice_amount,
492                invoices,
493                generate_invoice_with,
494                event_sender,
495                gateway_id.clone(),
496            ));
497            f
498        })
499        .collect::<Vec<_>>();
500
501    Ok(futures)
502}
503
504async fn get_notes_for_users(
505    users: u16,
506    notes_per_user: u16,
507    coordinator: ClientHandleArc,
508    note_denomination: Amount,
509) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> {
510    let mut users_notes = HashMap::new();
511    for u in 0..users {
512        users_notes.insert(u, Vec::with_capacity(notes_per_user.into()));
513        for _ in 0..notes_per_user {
514            let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?;
515            let user_amount = oob_notes.total_amount();
516            info!("Giving {user_amount} to user {u}");
517            users_notes.get_mut(&u).unwrap().push(oob_notes);
518        }
519    }
520    Ok(users_notes)
521}
522
523async fn get_users_clients(
524    n: u16,
525    db_path: Option<PathBuf>,
526    invite_code: Option<InviteCode>,
527) -> anyhow::Result<Vec<ClientHandleArc>> {
528    let mut users_clients = Vec::with_capacity(n.into());
529    for u in 0..n {
530        let (client, _) = get_user_client(u, &db_path, &invite_code).await?;
531        users_clients.push(client);
532    }
533    Ok(users_clients)
534}
535
536async fn get_user_client(
537    user_index: u16,
538    db_path: &Option<PathBuf>,
539    invite_code: &Option<InviteCode>,
540) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
541    let user_db = db_path
542        .as_ref()
543        .map(|db_path| db_path.join(format!("user_{user_index}.db")));
544    let user_invite_code = if user_db.as_ref().is_some_and(|db| db.exists()) {
545        None
546    } else {
547        invite_code.clone()
548    };
549    let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?;
550    // if lightning module is present, update the gateway cache
551    if let Ok(ln_client) = client.get_first_module::<LightningClientModule>() {
552        let _ = ln_client.update_gateway_cache().await;
553    }
554    Ok((client, invite_code))
555}
556
557async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> {
558    info!("Note summary:");
559    let summary = get_note_summary(coordinator).await?;
560    for (k, v) in summary.iter() {
561        info!("{k}: {v}");
562    }
563    Ok(())
564}
565
566async fn get_required_notes(
567    coordinator: &ClientHandleArc,
568    minimum_amount_required: Amount,
569    event_sender: &mpsc::UnboundedSender<MetricEvent>,
570) -> anyhow::Result<()> {
571    let current_balance = coordinator.get_balance_for_btc().await?;
572
573    if current_balance < minimum_amount_required {
574        let diff = minimum_amount_required.saturating_sub(current_balance);
575        info!(
576            "Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"
577        );
578        match try_get_notes_cli(&diff, 5).await {
579            Ok(notes) => {
580                info!("Got {} more notes, reissuing them", notes.total_amount());
581                reissue_notes(coordinator, notes, event_sender).await?;
582            }
583            Err(e) => {
584                info!("Unable to get more notes: '{e}', will try to proceed without them");
585            }
586        }
587    } else {
588        info!(
589            "Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"
590        );
591    }
592    Ok(())
593}
594
595async fn reissue_initial_notes(
596    initial_notes: Option<OOBNotes>,
597    coordinator: &ClientHandleArc,
598    event_sender: &mpsc::UnboundedSender<MetricEvent>,
599) -> anyhow::Result<()> {
600    if let Some(notes) = initial_notes {
601        let amount = notes.total_amount();
602        info!("Reissuing initial notes, got {amount}");
603        reissue_notes(coordinator, notes, event_sender).await?;
604    }
605    Ok(())
606}
607
608async fn get_coordinator_client(
609    db_path: &Option<PathBuf>,
610    invite_code: &Option<InviteCode>,
611) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
612    let (client, invite_code) = if let Some(db_path) = db_path {
613        let coordinator_db = db_path.join("coordinator.db");
614        if coordinator_db.exists() {
615            build_client(invite_code.clone(), Some(&coordinator_db)).await?
616        } else {
617            tokio::fs::create_dir_all(db_path).await?;
618            build_client(
619                Some(invite_code.clone().context(
620                    "Running on this archive dir for the first time, an invite code is required",
621                )?),
622                Some(&coordinator_db),
623            )
624            .await?
625        }
626    } else {
627        build_client(
628            Some(
629                invite_code
630                    .clone()
631                    .context("No archive dir given, an invite code is strictly required")?,
632            ),
633            None,
634        )
635        .await?
636    };
637    Ok((client, invite_code))
638}
639
640fn get_db_path(archive_dir: &Option<PathBuf>) -> Option<PathBuf> {
641    archive_dir.as_ref().map(|p| p.join("db"))
642}
643
644async fn get_lightning_gateway(
645    client: &ClientHandleArc,
646    gateway_id: Option<String>,
647) -> Option<LightningGateway> {
648    let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id");
649    let ln_module = client
650        .get_first_module::<LightningClientModule>()
651        .expect("Must have ln client module");
652    ln_module.select_gateway(&gateway_id).await
653}
654
655#[allow(clippy::too_many_arguments)]
656async fn do_load_test_user_task(
657    prefix: String,
658    client: ClientHandleArc,
659    oob_notes: Vec<OOBNotes>,
660    generated_invoices_per_user: u16,
661    ln_payment_sleep: Duration,
662    invoice_amount: Amount,
663    additional_invoices: Vec<Bolt11Invoice>,
664    generate_invoice_with: Option<LnInvoiceGeneration>,
665    event_sender: mpsc::UnboundedSender<MetricEvent>,
666    gateway_id: Option<String>,
667) -> anyhow::Result<()> {
668    let ln_gateway = get_lightning_gateway(&client, gateway_id).await;
669    for oob_note in oob_notes {
670        let amount = oob_note.total_amount();
671        reissue_notes(&client, oob_note, &event_sender)
672            .await
673            .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
674    }
675    let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable();
676    while let Some(_) = generated_invoices_per_user_iterator.next() {
677        let total_amount = get_note_summary(&client).await?.total_amount();
678        if invoice_amount > total_amount {
679            warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
680        } else {
681            match generate_invoice_with {
682                Some(LnInvoiceGeneration::LdkLightningCli) => {
683                    let invoice = ldk_create_invoice(invoice_amount).await?;
684                    gateway_pay_invoice(
685                        &prefix,
686                        "LND",
687                        &client,
688                        invoice.clone(),
689                        &event_sender,
690                        ln_gateway.clone(),
691                    )
692                    .await?;
693                    ldk_wait_invoice_payment(&invoice).await?;
694                }
695                None if additional_invoices.is_empty() => {
696                    debug!(
697                        "No method given to generate an invoice and no invoices on file, will not test the gateway"
698                    );
699                    break;
700                }
701                None => {
702                    break;
703                }
704            }
705            if generated_invoices_per_user_iterator.peek().is_some() {
706                // Only sleep while there are more invoices to pay
707                fedimint_core::task::sleep(ln_payment_sleep).await;
708            }
709        }
710    }
711    let mut additional_invoices = additional_invoices.into_iter().peekable();
712    while let Some(invoice) = additional_invoices.next() {
713        let total_amount = get_note_summary(&client).await?.total_amount();
714        let invoice_amount =
715            Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default());
716        if invoice_amount > total_amount {
717            warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
718        } else if invoice_amount == Amount::ZERO {
719            warn!("Can't pay invoice {invoice}, amount is zero");
720        } else {
721            gateway_pay_invoice(
722                &prefix,
723                "unknown",
724                &client,
725                invoice,
726                &event_sender,
727                ln_gateway.clone(),
728            )
729            .await?;
730            if additional_invoices.peek().is_some() {
731                // Only sleep while there are more invoices to pay
732                fedimint_core::task::sleep(ln_payment_sleep).await;
733            }
734        }
735    }
736    Ok(())
737}
738
739#[allow(clippy::too_many_arguments)]
740async fn run_ln_circular_load_test(
741    archive_dir: Option<PathBuf>,
742    users: u16,
743    invite_code: Option<InviteCode>,
744    initial_notes: Option<OOBNotes>,
745    test_duration: Duration,
746    ln_payment_sleep: Duration,
747    notes_per_user: u16,
748    note_denomination: Amount,
749    invoice_amount: Amount,
750    strategy: LnCircularStrategy,
751    event_sender: mpsc::UnboundedSender<MetricEvent>,
752) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
753    let db_path = get_db_path(&archive_dir);
754    let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
755    let minimum_notes = notes_per_user * users;
756    let minimum_amount_required = note_denomination * u64::from(minimum_notes);
757
758    reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
759    get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
760
761    info!(
762        "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
763    );
764    remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
765
766    print_coordinator_notes(&coordinator).await?;
767
768    let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?;
769
770    let mut users_notes =
771        get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
772
773    info!("Starting user tasks");
774    let futures = users_clients
775        .into_iter()
776        .enumerate()
777        .map(|(u, client)| {
778            let u = u as u16;
779            let oob_notes = users_notes.remove(&u).unwrap();
780            let event_sender = event_sender.clone();
781            let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task(
782                format!("User {u}:"),
783                client,
784                invite_code.clone(),
785                oob_notes,
786                test_duration,
787                ln_payment_sleep,
788                invoice_amount,
789                strategy,
790                event_sender,
791            ));
792            f
793        })
794        .collect::<Vec<_>>();
795
796    Ok(futures)
797}
798
799#[allow(clippy::too_many_arguments)]
800async fn do_ln_circular_test_user_task(
801    prefix: String,
802    client: ClientHandleArc,
803    invite_code: Option<InviteCode>,
804    oob_notes: Vec<OOBNotes>,
805    test_duration: Duration,
806    ln_payment_sleep: Duration,
807    invoice_amount: Amount,
808    strategy: LnCircularStrategy,
809    event_sender: mpsc::UnboundedSender<MetricEvent>,
810) -> anyhow::Result<()> {
811    for oob_note in oob_notes {
812        let amount = oob_note.total_amount();
813        reissue_notes(&client, oob_note, &event_sender)
814            .await
815            .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
816    }
817    let initial_time = fedimint_core::time::now();
818    let still_ontime = || async {
819        fedimint_core::time::now()
820            .duration_since(initial_time)
821            .expect("time to work")
822            <= test_duration
823    };
824    let sleep_a_bit = || async {
825        if still_ontime().await {
826            fedimint_core::task::sleep(ln_payment_sleep).await;
827        }
828    };
829    match strategy {
830        LnCircularStrategy::TwoGateways => {
831            let invoice_generation = LnInvoiceGeneration::LdkLightningCli;
832            while still_ontime().await {
833                let gateway_id = get_gateway_id(invoice_generation).await?;
834                let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await;
835                run_two_gateways_strategy(
836                    &prefix,
837                    &invoice_generation,
838                    &invoice_amount,
839                    &event_sender,
840                    &client,
841                    ln_gateway,
842                )
843                .await?;
844                sleep_a_bit().await;
845            }
846        }
847        LnCircularStrategy::SelfPayment => {
848            while still_ontime().await {
849                do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?;
850                sleep_a_bit().await;
851            }
852        }
853        LnCircularStrategy::PartnerPingPong => {
854            let (partner, _) = build_client(invite_code, None).await?;
855            while still_ontime().await {
856                do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender)
857                    .await?;
858                sleep_a_bit().await;
859            }
860        }
861    }
862    Ok(())
863}
864
865const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice";
866
867async fn run_two_gateways_strategy(
868    prefix: &str,
869    invoice_generation: &LnInvoiceGeneration,
870    invoice_amount: &Amount,
871    event_sender: &mpsc::UnboundedSender<MetricEvent>,
872    client: &ClientHandleArc,
873    ln_gateway: Option<LightningGateway>,
874) -> Result<(), anyhow::Error> {
875    let create_invoice_time = fedimint_core::time::now();
876    match *invoice_generation {
877        LnInvoiceGeneration::LdkLightningCli => {
878            let invoice = ldk_create_invoice(*invoice_amount).await?;
879            let elapsed = create_invoice_time.elapsed()?;
880            info!("Created invoice using LDK in {elapsed:?}");
881            event_sender.send(MetricEvent {
882                name: GATEWAY_CREATE_INVOICE.into(),
883                duration: elapsed,
884            })?;
885            gateway_pay_invoice(
886                prefix,
887                "LND",
888                client,
889                invoice.clone(),
890                event_sender,
891                ln_gateway.clone(),
892            )
893            .await?;
894            ldk_wait_invoice_payment(&invoice).await?;
895            let (operation_id, invoice) =
896                client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?;
897            let pay_invoice_time = fedimint_core::time::now();
898            ldk_pay_invoice(invoice).await?;
899            wait_invoice_payment(
900                prefix,
901                "LND",
902                client,
903                operation_id,
904                event_sender,
905                pay_invoice_time,
906            )
907            .await?;
908        }
909    }
910    Ok(())
911}
912
913async fn do_self_payment(
914    prefix: &str,
915    client: &ClientHandleArc,
916    invoice_amount: Amount,
917    event_sender: &mpsc::UnboundedSender<MetricEvent>,
918) -> anyhow::Result<()> {
919    let (operation_id, invoice) =
920        client_create_invoice(client, invoice_amount, event_sender, None).await?;
921    let pay_invoice_time = fedimint_core::time::now();
922    let lightning_module = client.get_first_module::<LightningClientModule>()?;
923    //let gateway = lightning_module.select_active_gateway_opt().await;
924    lightning_module
925        .pay_bolt11_invoice(None, invoice, ())
926        .await?;
927    wait_invoice_payment(
928        prefix,
929        "gateway",
930        client,
931        operation_id,
932        event_sender,
933        pay_invoice_time,
934    )
935    .await?;
936    Ok(())
937}
938
939async fn do_partner_ping_pong(
940    prefix: &str,
941    client: &ClientHandleArc,
942    partner: &ClientHandleArc,
943    invoice_amount: Amount,
944    event_sender: &mpsc::UnboundedSender<MetricEvent>,
945) -> anyhow::Result<()> {
946    // Ping (partner creates invoice, client pays)
947    let (operation_id, invoice) =
948        client_create_invoice(partner, invoice_amount, event_sender, None).await?;
949    let pay_invoice_time = fedimint_core::time::now();
950    let lightning_module = client.get_first_module::<LightningClientModule>()?;
951    // TODO: Select random gateway?
952    //let gateway = lightning_module.select_active_gateway_opt().await;
953    lightning_module
954        .pay_bolt11_invoice(None, invoice, ())
955        .await?;
956    wait_invoice_payment(
957        prefix,
958        "gateway",
959        partner,
960        operation_id,
961        event_sender,
962        pay_invoice_time,
963    )
964    .await?;
965    // Pong (client creates invoice, partner pays)
966    let (operation_id, invoice) =
967        client_create_invoice(client, invoice_amount, event_sender, None).await?;
968    let pay_invoice_time = fedimint_core::time::now();
969    let partner_lightning_module = partner.get_first_module::<LightningClientModule>()?;
970    //let gateway = partner_lightning_module.select_active_gateway_opt().await;
971    // TODO: Select random gateway?
972    partner_lightning_module
973        .pay_bolt11_invoice(None, invoice, ())
974        .await?;
975    wait_invoice_payment(
976        prefix,
977        "gateway",
978        client,
979        operation_id,
980        event_sender,
981        pay_invoice_time,
982    )
983    .await?;
984    Ok(())
985}
986
987async fn wait_invoice_payment(
988    prefix: &str,
989    gateway_name: &str,
990    client: &ClientHandleArc,
991    operation_id: fedimint_core::core::OperationId,
992    event_sender: &mpsc::UnboundedSender<MetricEvent>,
993    pay_invoice_time: std::time::SystemTime,
994) -> anyhow::Result<()> {
995    let elapsed = pay_invoice_time.elapsed()?;
996    info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}");
997    event_sender.send(MetricEvent {
998        name: format!("gateway_{gateway_name}_payment_received_started"),
999        duration: elapsed,
1000    })?;
1001    let lightning_module = client.get_first_module::<LightningClientModule>()?;
1002    let mut updates = lightning_module
1003        .subscribe_ln_receive(operation_id)
1004        .await?
1005        .into_stream();
1006    while let Some(update) = updates.next().await {
1007        debug!(%prefix, ?update, "Invoice payment update");
1008        match update {
1009            LnReceiveState::Claimed => {
1010                let elapsed: Duration = pay_invoice_time.elapsed()?;
1011                info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}");
1012                event_sender.send(MetricEvent {
1013                    name: "gateway_payment_received_success".into(),
1014                    duration: elapsed,
1015                })?;
1016                event_sender.send(MetricEvent {
1017                    name: format!("gateway_{gateway_name}_payment_received_success"),
1018                    duration: elapsed,
1019                })?;
1020                break;
1021            }
1022            LnReceiveState::Canceled { reason } => {
1023                let elapsed: Duration = pay_invoice_time.elapsed()?;
1024                info!(
1025                    "{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"
1026                );
1027                event_sender.send(MetricEvent {
1028                    name: "gateway_payment_received_canceled".into(),
1029                    duration: elapsed,
1030                })?;
1031                break;
1032            }
1033            _ => {}
1034        }
1035    }
1036    Ok(())
1037}
1038
1039async fn client_create_invoice(
1040    client: &ClientHandleArc,
1041    invoice_amount: Amount,
1042    event_sender: &mpsc::UnboundedSender<MetricEvent>,
1043    ln_gateway: Option<LightningGateway>,
1044) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> {
1045    let create_invoice_time = fedimint_core::time::now();
1046    let lightning_module = client.get_first_module::<LightningClientModule>()?;
1047    let desc = Description::new("test".to_string())?;
1048    let (operation_id, invoice, _) = lightning_module
1049        .create_bolt11_invoice(
1050            invoice_amount,
1051            Bolt11InvoiceDescription::Direct(desc),
1052            None,
1053            (),
1054            ln_gateway,
1055        )
1056        .await?;
1057    let elapsed = create_invoice_time.elapsed()?;
1058    info!("Created invoice using gateway in {elapsed:?}");
1059    event_sender.send(MetricEvent {
1060        name: GATEWAY_CREATE_INVOICE.into(),
1061        duration: elapsed,
1062    })?;
1063    Ok((operation_id, invoice))
1064}
1065
1066fn test_download_config(
1067    endpoints: &ConnectorRegistry,
1068    invite_code: &InviteCode,
1069    users: u16,
1070    event_sender: &mpsc::UnboundedSender<MetricEvent>,
1071) -> Vec<BoxFuture<'static, anyhow::Result<()>>> {
1072    (0..users)
1073        .map(move |_| {
1074            let invite_code = invite_code.clone();
1075            let event_sender = event_sender.clone();
1076            let endpoints = endpoints.clone();
1077            let f: BoxFuture<_> = Box::pin(async move {
1078                let m = fedimint_core::time::now();
1079                let _ = download_from_invite_code(&endpoints, &invite_code).await?;
1080                event_sender.send(MetricEvent {
1081                    name: "download_client_config".into(),
1082                    duration: m.elapsed()?,
1083                })?;
1084                Ok(())
1085            });
1086            f
1087        })
1088        .collect()
1089}
1090
1091async fn test_connect_raw_client(
1092    endpoints: &ConnectorRegistry,
1093    invite_code: InviteCode,
1094    users: u16,
1095    duration: Duration,
1096    timeout: Duration,
1097    limit_endpoints: Option<usize>,
1098    event_sender: mpsc::UnboundedSender<MetricEvent>,
1099) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
1100    use jsonrpsee_core::client::ClientT;
1101    use jsonrpsee_ws_client::WsClientBuilder;
1102
1103    let (mut cfg, _) = download_from_invite_code(endpoints, &invite_code).await?;
1104
1105    if let Some(limit_endpoints) = limit_endpoints {
1106        cfg.global.api_endpoints = cfg
1107            .global
1108            .api_endpoints
1109            .into_iter()
1110            .take(limit_endpoints)
1111            .collect();
1112        info!("Limiting endpoints to {:?}", cfg.global.api_endpoints);
1113    }
1114
1115    info!("Connecting to {users} clients");
1116    let clients = (0..users)
1117        .flat_map(|_| {
1118            cfg.global.api_endpoints.values().map(|url| async {
1119                let ws_client = WsClientBuilder::default()
1120                    .request_timeout(timeout)
1121                    .connection_timeout(timeout)
1122                    .build(url_to_string_with_default_port(&url.url))
1123                    .await?;
1124                Ok::<_, anyhow::Error>(ws_client)
1125            })
1126        })
1127        .collect::<Vec<_>>();
1128    let clients = futures::future::try_join_all(clients).await?;
1129    info!("Keeping {users} clients connected for {duration:?}");
1130    Ok(clients
1131        .into_iter()
1132        .map(|client| {
1133            let event_sender = event_sender.clone();
1134            let f: BoxFuture<_> = Box::pin(async move {
1135                let initial_time = fedimint_core::time::now();
1136                while initial_time.elapsed()? < duration {
1137                    let m = fedimint_core::time::now();
1138                    let _epoch: u64 = client
1139                        .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()])
1140                        .await?;
1141                    event_sender.send(MetricEvent {
1142                        name: SESSION_COUNT_ENDPOINT.into(),
1143                        duration: m.elapsed()?,
1144                    })?;
1145                    fedimint_core::task::sleep(Duration::from_secs(1)).await;
1146                }
1147                Ok(())
1148            });
1149            f
1150        })
1151        .collect())
1152}
1153
1154fn url_to_string_with_default_port(url: &SafeUrl) -> String {
1155    format!(
1156        "{}://{}:{}{}",
1157        url.scheme(),
1158        url.host().expect("Asserted on construction"),
1159        url.port_or_known_default()
1160            .expect("Asserted on construction"),
1161        url.path()
1162    )
1163}
1164
1165async fn handle_metrics_summary(
1166    opts: Opts,
1167    mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>,
1168) -> anyhow::Result<()> {
1169    let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs();
1170    let mut metrics_json_output_files = vec![];
1171    let mut previous_metrics = vec![];
1172    let mut comparison_output = None;
1173    if let Some(archive_dir) = opts.archive_dir {
1174        let mut archive_metrics = archive_dir.join("metrics");
1175        archive_metrics.push(opts.users.to_string());
1176        tokio::fs::create_dir_all(&archive_metrics).await?;
1177        let mut archive_comparisons = archive_dir.join("comparisons");
1178        archive_comparisons.push(opts.users.to_string());
1179        tokio::fs::create_dir_all(&archive_comparisons).await?;
1180
1181        let latest_metrics_file = std::fs::read_dir(&archive_metrics)?
1182            .map(|entry| {
1183                let entry = entry.unwrap();
1184                let metadata = entry.metadata().unwrap();
1185                let created = metadata
1186                    .created()
1187                    .unwrap_or_else(|_| metadata.modified().unwrap());
1188                (entry, created)
1189            })
1190            .max_by_key(|(_entry, created)| created.to_owned())
1191            .map(|(entry, _)| entry.path());
1192        if let Some(latest_metrics_file) = latest_metrics_file {
1193            let display_path = latest_metrics_file.display();
1194            let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file)
1195                .await
1196                .with_context(|| format!("Failed to open {display_path}"))?;
1197            let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines();
1198            while let Some(line) = lines.next_line().await? {
1199                match serde_json::from_str::<EventMetricSummary>(&line) {
1200                    Ok(metric) => {
1201                        previous_metrics.push(metric);
1202                    }
1203                    Err(e) => {
1204                        warn!("Failed to parse previous metric: {e:?}");
1205                    }
1206                }
1207            }
1208        }
1209        let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",));
1210        let new_metric_output = BufWriter::new(
1211            OpenOptions::new()
1212                .write(true)
1213                .create(true)
1214                .truncate(true)
1215                .open(new_metric_output)
1216                .await?,
1217        );
1218        metrics_json_output_files.push(new_metric_output);
1219        if !previous_metrics.is_empty() {
1220            let new_comparison_output =
1221                archive_comparisons.join(format!("{timestamp_seconds}.json",));
1222            comparison_output = Some(BufWriter::new(
1223                OpenOptions::new()
1224                    .write(true)
1225                    .create(true)
1226                    .truncate(true)
1227                    .open(new_comparison_output)
1228                    .await?,
1229            ));
1230        }
1231    }
1232    if let Some(metrics_json_output) = opts.metrics_json_output {
1233        metrics_json_output_files.push(BufWriter::new(
1234            tokio::fs::OpenOptions::new()
1235                .write(true)
1236                .create(true)
1237                .truncate(true)
1238                .open(metrics_json_output)
1239                .await?,
1240        ));
1241    }
1242    let mut results = BTreeMap::new();
1243    while let Some(event) = event_receiver.recv().await {
1244        let entry = results.entry(event.name).or_insert_with(Vec::new);
1245        entry.push(event.duration);
1246    }
1247    let mut previous_metrics = previous_metrics
1248        .into_iter()
1249        .map(|metric| (metric.name.clone(), metric))
1250        .collect::<HashMap<_, _>>();
1251    for (k, mut v) in results {
1252        v.sort();
1253        let n = v.len();
1254        let max = v.iter().last().unwrap();
1255        let min = v.first().unwrap();
1256        let median = v[n / 2];
1257        let sum: Duration = v.iter().sum();
1258        let avg = sum / n as u32;
1259        let metric_summary = EventMetricSummary {
1260            name: k.clone(),
1261            users: u64::from(opts.users),
1262            n: n as u64,
1263            avg_ms: avg.as_millis(),
1264            median_ms: median.as_millis(),
1265            max_ms: max.as_millis(),
1266            min_ms: min.as_millis(),
1267            timestamp_seconds,
1268        };
1269        let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) {
1270            if previous_metric.n == metric_summary.n {
1271                fn calculate_gain(current: u128, previous: u128) -> f64 {
1272                    current as f64 / previous as f64
1273                }
1274                let comparison = EventMetricComparison {
1275                    avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms),
1276                    median_ms_gain: calculate_gain(
1277                        metric_summary.median_ms,
1278                        previous_metric.median_ms,
1279                    ),
1280                    max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms),
1281                    min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms),
1282                    current: metric_summary.clone(),
1283                    previous: previous_metric,
1284                };
1285                if let Some(comparison_output) = &mut comparison_output {
1286                    let comparison_json =
1287                        serde_json::to_string(&comparison).expect("to be serializable");
1288                    comparison_output
1289                        .write_all(format!("{comparison_json}\n").as_bytes())
1290                        .await
1291                        .expect("to write on file");
1292                }
1293                Some(comparison)
1294            } else {
1295                info!(
1296                    "Skipping comparison for {k} because previous metric has different n ({} vs {})",
1297                    previous_metric.n, metric_summary.n
1298                );
1299                None
1300            }
1301        } else {
1302            None
1303        };
1304        if let Some(comparison) = comparison {
1305            println!(
1306                "{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"
1307            );
1308        } else {
1309            println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}");
1310        }
1311        let metric_summary_json =
1312            serde_json::to_string(&metric_summary).expect("to be serializable");
1313        for metrics_json_output_file in &mut metrics_json_output_files {
1314            metrics_json_output_file
1315                .write_all(format!("{metric_summary_json}\n").as_bytes())
1316                .await
1317                .expect("to write on file");
1318        }
1319    }
1320    for mut output in metrics_json_output_files {
1321        output.flush().await?;
1322    }
1323    if let Some(mut output) = comparison_output {
1324        output.flush().await?;
1325    }
1326    Ok(())
1327}
1328
1329async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> {
1330    let gateway_json = match generate_invoice_with {
1331        LnInvoiceGeneration::LdkLightningCli => {
1332            // If we are paying a lnd invoice, we use the LDK node
1333            cmd!(GatewayLndCli, "info").out_json().await
1334        }
1335    }?;
1336
1337    let gatewayd_version = devimint::util::Gatewayd::version_or_default().await;
1338    let gateway_id = if gatewayd_version < *VERSION_0_10_0_ALPHA {
1339        gateway_json["gateway_id"]
1340            .as_str()
1341            .context("gateway_id must be a string")?
1342            .to_owned()
1343    } else {
1344        gateway_json["registrations"]["http"][1]
1345            .as_str()
1346            .context("gateway id must be a string")?
1347            .to_owned()
1348    };
1349
1350    Ok(gateway_id)
1351}