1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_precision_loss)]
4#![allow(clippy::missing_errors_doc)]
5#![allow(clippy::ref_option)]
6#![allow(clippy::too_many_lines)]
7#![allow(clippy::large_futures)]
8
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::time::Duration;
13use std::vec;
14
15use anyhow::{Context, bail};
16use clap::{Args, Parser, Subcommand, ValueEnum};
17use common::{
18 gateway_pay_invoice, get_note_summary, ldk_create_invoice, ldk_pay_invoice,
19 ldk_wait_invoice_payment, parse_gateway_id, reissue_notes,
20};
21use devimint::cmd;
22use devimint::util::GatewayLndCli;
23use fedimint_client::ClientHandleArc;
24use fedimint_core::Amount;
25use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
26use fedimint_core::invite_code::InviteCode;
27use fedimint_core::module::ApiRequestErased;
28use fedimint_core::runtime::spawn;
29use fedimint_core::util::{BoxFuture, SafeUrl};
30use fedimint_ln_client::{LightningClientModule, LnReceiveState};
31use fedimint_ln_common::LightningGateway;
32use fedimint_mint_client::OOBNotes;
33use futures::StreamExt;
34use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
35use serde::{Deserialize, Serialize};
36use tokio::fs::OpenOptions;
37use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
38use tokio::sync::mpsc;
39use tracing::{debug, info, warn};
40
41use crate::common::{
42 build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli,
43};
44pub mod common;
45
46#[derive(Parser, Clone)]
47#[command(version)]
48struct Opts {
49 #[arg(
50 long,
51 default_value = "10",
52 help = "Number of users. Each user will work in parallel"
53 )]
54 users: u16,
55
56 #[arg(long, help = "Output with the metrics results in JSON format")]
57 metrics_json_output: Option<PathBuf>,
58
59 #[arg(
60 long,
61 help = "If given, will be used to store and retrieve past metrics for comparison purposes"
62 )]
63 archive_dir: Option<PathBuf>,
64
65 #[clap(subcommand)]
66 command: Command,
67}
68
69#[derive(Debug, Clone, Copy, ValueEnum)]
70enum LnInvoiceGeneration {
71 LdkLightningCli,
72}
73
74#[derive(Subcommand, Clone)]
75enum Command {
76 #[command(about = "Keep many websocket connections to a federation for a duration of time")]
77 TestConnect {
78 #[arg(long, help = "Federation invite code")]
79 invite_code: String,
80 #[arg(
81 long,
82 default_value = "60",
83 help = "How much time to keep the connections open, in seconds"
84 )]
85 duration_secs: u64,
86 #[arg(
87 long,
88 default_value = "120",
89 help = "Timeout for connection attempt and for each request, in secnods"
90 )]
91 timeout_secs: u64,
92 #[arg(
93 long,
94 help = "If given, will limit the number of endpoints (guardians) to connect to"
95 )]
96 limit_endpoints: Option<usize>,
97 },
98 #[command(about = "Try to download the client config many times.")]
99 TestDownload {
100 #[arg(long, help = "Federation invite code")]
101 invite_code: String,
102 },
103 #[command(
104 about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway"
105 )]
106 LoadTest(LoadTestArgs),
107 #[command()]
112 LnCircularLoadTest(LnCircularLoadTestArgs),
113}
114
115#[derive(Args, Clone)]
116struct LoadTestArgs {
117 #[arg(
118 long,
119 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
120 )]
121 invite_code: Option<InviteCode>,
122
123 #[arg(
124 long,
125 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
126 )]
127 initial_notes: Option<OOBNotes>,
128
129 #[arg(
130 long,
131 help = "Gateway Id. If none, retrieve one according to --generate-invoice-with"
132 )]
133 gateway_id: Option<String>,
134
135 #[arg(
136 long,
137 help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)"
138 )]
139 generate_invoice_with: Option<LnInvoiceGeneration>,
140
141 #[arg(
142 long,
143 default_value = "1",
144 help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided"
145 )]
146 invoices_per_user: u16,
147
148 #[arg(
149 long,
150 default_value = "0",
151 help = "How many seconds to sleep between LN payments"
152 )]
153 ln_payment_sleep_secs: u64,
154
155 #[arg(
156 long,
157 help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid"
158 )]
159 invoices_file: Option<PathBuf>,
160
161 #[arg(
162 long,
163 help = "How many notes to distribute to each user",
164 default_value = "2"
165 )]
166 notes_per_user: u16,
167
168 #[arg(
169 long,
170 help = "Note denomination to use for the test",
171 default_value = "1024"
172 )]
173 note_denomination: Amount,
174
175 #[arg(
176 long,
177 help = "Invoice amount when generating one",
178 default_value = "1000"
179 )]
180 invoice_amount: Amount,
181}
182
183#[derive(Args, Clone)]
184struct LnCircularLoadTestArgs {
185 #[arg(
186 long,
187 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
188 )]
189 invite_code: Option<InviteCode>,
190
191 #[arg(
192 long,
193 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
194 )]
195 initial_notes: Option<OOBNotes>,
196
197 #[arg(
198 long,
199 default_value = "60",
200 help = "For how many seconds to run the test"
201 )]
202 test_duration_secs: u64,
203
204 #[arg(
205 long,
206 default_value = "0",
207 help = "How many seconds to sleep between LN payments"
208 )]
209 ln_payment_sleep_secs: u64,
210
211 #[arg(
212 long,
213 help = "How many notes to distribute to each user",
214 default_value = "1"
215 )]
216 notes_per_user: u16,
217
218 #[arg(
219 long,
220 help = "Note denomination to use for the test",
221 default_value = "1024"
222 )]
223 note_denomination: Amount,
224
225 #[arg(
226 long,
227 help = "Invoice amount when generating one",
228 default_value = "1000"
229 )]
230 invoice_amount: Amount,
231
232 #[arg(long)]
233 strategy: LnCircularStrategy,
234}
235
236#[derive(Debug, Clone, Copy, ValueEnum)]
237enum LnCircularStrategy {
238 SelfPayment,
240 TwoGateways,
243 PartnerPingPong,
245}
246
247#[derive(Debug, Clone)]
248pub struct MetricEvent {
249 name: String,
250 duration: Duration,
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
254struct EventMetricSummary {
255 name: String,
256 users: u64,
257 n: u64,
258 avg_ms: u128,
259 median_ms: u128,
260 max_ms: u128,
261 min_ms: u128,
262 timestamp_seconds: u64,
263}
264
265#[derive(Debug, Serialize, Deserialize)]
266struct EventMetricComparison {
267 avg_ms_gain: f64,
268 median_ms_gain: f64,
269 max_ms_gain: f64,
270 min_ms_gain: f64,
271 current: EventMetricSummary,
272 previous: EventMetricSummary,
273}
274
275impl std::fmt::Display for EventMetricComparison {
276 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277 fn to_percent(gain: f64) -> String {
278 if gain >= 1.0 {
279 format!("+{:.2}%", (gain - 1.0) * 100.0)
280 } else {
281 format!("-{:.2}%", (1.0 - gain) * 100.0)
282 }
283 }
284 f.write_str(&format!(
285 "avg: {}, median: {}, max: {}, min: {}",
286 to_percent(self.avg_ms_gain),
287 to_percent(self.median_ms_gain),
288 to_percent(self.max_ms_gain),
289 to_percent(self.min_ms_gain),
290 ))
291 }
292}
293
294#[tokio::main]
295async fn main() -> anyhow::Result<()> {
296 fedimint_logging::TracingSetup::default().init()?;
297 let opts = Opts::parse();
298 let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
299 let summary_handle = spawn("handle metrics summary", {
300 let opts = opts.clone();
301 async { handle_metrics_summary(opts, event_receiver).await }
302 });
303 let futures = match opts.command.clone() {
304 Command::TestConnect {
305 invite_code,
306 duration_secs,
307 timeout_secs,
308 limit_endpoints,
309 } => {
310 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
311 test_connect_raw_client(
312 invite_code,
313 opts.users,
314 Duration::from_secs(duration_secs),
315 Duration::from_secs(timeout_secs),
316 limit_endpoints,
317 event_sender.clone(),
318 )
319 .await?
320 }
321 Command::TestDownload { invite_code } => {
322 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
323 test_download_config(&invite_code, opts.users, &event_sender.clone())
324 }
325 Command::LoadTest(args) => {
326 let invite_code = invite_code_or_fallback(args.invite_code).await;
327
328 let gateway_id = if let Some(gateway_id) = args.gateway_id {
329 Some(gateway_id)
330 } else if let Some(generate_invoice_with) = args.generate_invoice_with {
331 Some(get_gateway_id(generate_invoice_with).await?)
332 } else {
333 None
334 };
335 let invoices = if let Some(invoices_file) = args.invoices_file {
336 let display_path = invoices_file.display();
337 let invoices_file = tokio::fs::File::open(&invoices_file)
338 .await
339 .with_context(|| format!("Failed to open {display_path}"))?;
340 let mut lines = tokio::io::BufReader::new(invoices_file).lines();
341 let mut invoices = vec![];
342 while let Some(line) = lines.next_line().await? {
343 let invoice = Bolt11Invoice::from_str(&line)?;
344 invoices.push(invoice);
345 }
346 invoices
347 } else {
348 vec![]
349 };
350 if args.generate_invoice_with.is_none() && invoices.is_empty() {
351 info!(
352 "No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run"
353 );
354 }
355 run_load_test(
356 opts.archive_dir,
357 opts.users,
358 invite_code,
359 args.initial_notes,
360 args.generate_invoice_with,
361 args.invoices_per_user,
362 Duration::from_secs(args.ln_payment_sleep_secs),
363 invoices,
364 gateway_id,
365 args.notes_per_user,
366 args.note_denomination,
367 args.invoice_amount,
368 event_sender.clone(),
369 )
370 .await?
371 }
372 Command::LnCircularLoadTest(args) => {
373 let invite_code = invite_code_or_fallback(args.invite_code).await;
374 run_ln_circular_load_test(
375 opts.archive_dir,
376 opts.users,
377 invite_code,
378 args.initial_notes,
379 Duration::from_secs(args.test_duration_secs),
380 Duration::from_secs(args.ln_payment_sleep_secs),
381 args.notes_per_user,
382 args.note_denomination,
383 args.invoice_amount,
384 args.strategy,
385 event_sender.clone(),
386 )
387 .await?
388 }
389 };
390
391 let result = futures::future::join_all(futures).await;
392 drop(event_sender);
393 summary_handle.await??;
394 let len_failures = result.iter().filter(|r| r.is_err()).count();
395 eprintln!("{} results, {len_failures} failures", result.len());
396 for r in result {
397 if let Err(e) = r {
398 warn!("Task failed: {:?}", e);
399 }
400 }
401 if len_failures > 0 {
402 bail!("Finished with failures");
403 }
404 info!("Finished successfully");
405 Ok(())
406}
407
408async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> {
409 if let Some(invite_code) = invite_code {
410 Some(invite_code)
411 } else {
412 match get_invite_code_cli(0.into()).await {
414 Ok(invite_code) => Some(invite_code),
415 Err(e) => {
416 info!(
417 "No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."
418 );
419 None
420 }
421 }
422 }
423}
424
425#[allow(clippy::too_many_arguments)]
426async fn run_load_test(
427 archive_dir: Option<PathBuf>,
428 users: u16,
429 invite_code: Option<InviteCode>,
430 initial_notes: Option<OOBNotes>,
431 generate_invoice_with: Option<LnInvoiceGeneration>,
432 generated_invoices_per_user: u16,
433 ln_payment_sleep: Duration,
434 invoices_from_file: Vec<Bolt11Invoice>,
435 gateway_id: Option<String>,
436 notes_per_user: u16,
437 note_denomination: Amount,
438 invoice_amount: Amount,
439 event_sender: mpsc::UnboundedSender<MetricEvent>,
440) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
441 let db_path = get_db_path(&archive_dir);
442 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
443 let minimum_notes = notes_per_user * users;
444 let minimum_amount_required = note_denomination * u64::from(minimum_notes);
445
446 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
447 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
448 print_coordinator_notes(&coordinator).await?;
449 info!(
450 "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
451 );
452 remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
453 print_coordinator_notes(&coordinator).await?;
454
455 let users_clients = get_users_clients(users, db_path, invite_code).await?;
456
457 let mut users_notes =
458 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
459 let mut users_invoices = HashMap::new();
460 let mut user = 0;
461 for invoice in invoices_from_file {
463 users_invoices
464 .entry(user)
465 .or_insert_with(Vec::new)
466 .push(invoice);
467 user = (user + 1) % users;
468 }
469
470 info!("Starting user tasks");
471 let futures = users_clients
472 .into_iter()
473 .enumerate()
474 .map(|(u, client)| {
475 let u = u as u16;
476 let oob_notes = users_notes.remove(&u).unwrap();
477 let invoices = users_invoices.remove(&u).unwrap_or_default();
478 let event_sender = event_sender.clone();
479 let f: BoxFuture<_> = Box::pin(do_load_test_user_task(
480 format!("User {u}:"),
481 client,
482 oob_notes,
483 generated_invoices_per_user,
484 ln_payment_sleep,
485 invoice_amount,
486 invoices,
487 generate_invoice_with,
488 event_sender,
489 gateway_id.clone(),
490 ));
491 f
492 })
493 .collect::<Vec<_>>();
494
495 Ok(futures)
496}
497
498async fn get_notes_for_users(
499 users: u16,
500 notes_per_user: u16,
501 coordinator: ClientHandleArc,
502 note_denomination: Amount,
503) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> {
504 let mut users_notes = HashMap::new();
505 for u in 0..users {
506 users_notes.insert(u, Vec::with_capacity(notes_per_user.into()));
507 for _ in 0..notes_per_user {
508 let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?;
509 let user_amount = oob_notes.total_amount();
510 info!("Giving {user_amount} to user {u}");
511 users_notes.get_mut(&u).unwrap().push(oob_notes);
512 }
513 }
514 Ok(users_notes)
515}
516
517async fn get_users_clients(
518 n: u16,
519 db_path: Option<PathBuf>,
520 invite_code: Option<InviteCode>,
521) -> anyhow::Result<Vec<ClientHandleArc>> {
522 let mut users_clients = Vec::with_capacity(n.into());
523 for u in 0..n {
524 let (client, _) = get_user_client(u, &db_path, &invite_code).await?;
525 users_clients.push(client);
526 }
527 Ok(users_clients)
528}
529
530async fn get_user_client(
531 user_index: u16,
532 db_path: &Option<PathBuf>,
533 invite_code: &Option<InviteCode>,
534) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
535 let user_db = db_path
536 .as_ref()
537 .map(|db_path| db_path.join(format!("user_{user_index}.db")));
538 let user_invite_code = if user_db.as_ref().is_some_and(|db| db.exists()) {
539 None
540 } else {
541 invite_code.clone()
542 };
543 let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?;
544 if let Ok(ln_client) = client.get_first_module::<LightningClientModule>() {
546 let _ = ln_client.update_gateway_cache().await;
547 }
548 Ok((client, invite_code))
549}
550
551async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> {
552 info!("Note summary:");
553 let summary = get_note_summary(coordinator).await?;
554 for (k, v) in summary.iter() {
555 info!("{k}: {v}");
556 }
557 Ok(())
558}
559
560async fn get_required_notes(
561 coordinator: &ClientHandleArc,
562 minimum_amount_required: Amount,
563 event_sender: &mpsc::UnboundedSender<MetricEvent>,
564) -> anyhow::Result<()> {
565 let current_balance = coordinator.get_balance().await;
566 if current_balance < minimum_amount_required {
567 let diff = minimum_amount_required.saturating_sub(current_balance);
568 info!(
569 "Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"
570 );
571 match try_get_notes_cli(&diff, 5).await {
572 Ok(notes) => {
573 info!("Got {} more notes, reissuing them", notes.total_amount());
574 reissue_notes(coordinator, notes, event_sender).await?;
575 }
576 Err(e) => {
577 info!("Unable to get more notes: '{e}', will try to proceed without them");
578 }
579 }
580 } else {
581 info!(
582 "Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"
583 );
584 }
585 Ok(())
586}
587
588async fn reissue_initial_notes(
589 initial_notes: Option<OOBNotes>,
590 coordinator: &ClientHandleArc,
591 event_sender: &mpsc::UnboundedSender<MetricEvent>,
592) -> anyhow::Result<()> {
593 if let Some(notes) = initial_notes {
594 let amount = notes.total_amount();
595 info!("Reissuing initial notes, got {amount}");
596 reissue_notes(coordinator, notes, event_sender).await?;
597 }
598 Ok(())
599}
600
601async fn get_coordinator_client(
602 db_path: &Option<PathBuf>,
603 invite_code: &Option<InviteCode>,
604) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
605 let (client, invite_code) = if let Some(db_path) = db_path {
606 let coordinator_db = db_path.join("coordinator.db");
607 if coordinator_db.exists() {
608 build_client(invite_code.clone(), Some(&coordinator_db)).await?
609 } else {
610 tokio::fs::create_dir_all(db_path).await?;
611 build_client(
612 Some(invite_code.clone().context(
613 "Running on this archive dir for the first time, an invite code is required",
614 )?),
615 Some(&coordinator_db),
616 )
617 .await?
618 }
619 } else {
620 build_client(
621 Some(
622 invite_code
623 .clone()
624 .context("No archive dir given, an invite code is strictly required")?,
625 ),
626 None,
627 )
628 .await?
629 };
630 Ok((client, invite_code))
631}
632
633fn get_db_path(archive_dir: &Option<PathBuf>) -> Option<PathBuf> {
634 archive_dir.as_ref().map(|p| p.join("db"))
635}
636
637async fn get_lightning_gateway(
638 client: &ClientHandleArc,
639 gateway_id: Option<String>,
640) -> Option<LightningGateway> {
641 let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id");
642 let ln_module = client
643 .get_first_module::<LightningClientModule>()
644 .expect("Must have ln client module");
645 ln_module.select_gateway(&gateway_id).await
646}
647
648#[allow(clippy::too_many_arguments)]
649async fn do_load_test_user_task(
650 prefix: String,
651 client: ClientHandleArc,
652 oob_notes: Vec<OOBNotes>,
653 generated_invoices_per_user: u16,
654 ln_payment_sleep: Duration,
655 invoice_amount: Amount,
656 additional_invoices: Vec<Bolt11Invoice>,
657 generate_invoice_with: Option<LnInvoiceGeneration>,
658 event_sender: mpsc::UnboundedSender<MetricEvent>,
659 gateway_id: Option<String>,
660) -> anyhow::Result<()> {
661 let ln_gateway = get_lightning_gateway(&client, gateway_id).await;
662 for oob_note in oob_notes {
663 let amount = oob_note.total_amount();
664 reissue_notes(&client, oob_note, &event_sender)
665 .await
666 .map_err(|e| anyhow::anyhow!("while reissuing initial {amount}: {e}"))?;
667 }
668 let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable();
669 while let Some(_) = generated_invoices_per_user_iterator.next() {
670 let total_amount = get_note_summary(&client).await?.total_amount();
671 if invoice_amount > total_amount {
672 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
673 } else {
674 match generate_invoice_with {
675 Some(LnInvoiceGeneration::LdkLightningCli) => {
676 let invoice = ldk_create_invoice(invoice_amount).await?;
677 gateway_pay_invoice(
678 &prefix,
679 "LND",
680 &client,
681 invoice.clone(),
682 &event_sender,
683 ln_gateway.clone(),
684 )
685 .await?;
686 ldk_wait_invoice_payment(&invoice).await?;
687 }
688 None if additional_invoices.is_empty() => {
689 debug!(
690 "No method given to generate an invoice and no invoices on file, will not test the gateway"
691 );
692 break;
693 }
694 None => {
695 break;
696 }
697 }
698 if generated_invoices_per_user_iterator.peek().is_some() {
699 fedimint_core::task::sleep(ln_payment_sleep).await;
701 }
702 }
703 }
704 let mut additional_invoices = additional_invoices.into_iter().peekable();
705 while let Some(invoice) = additional_invoices.next() {
706 let total_amount = get_note_summary(&client).await?.total_amount();
707 let invoice_amount =
708 Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default());
709 if invoice_amount > total_amount {
710 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
711 } else if invoice_amount == Amount::ZERO {
712 warn!("Can't pay invoice {invoice}, amount is zero");
713 } else {
714 gateway_pay_invoice(
715 &prefix,
716 "unknown",
717 &client,
718 invoice,
719 &event_sender,
720 ln_gateway.clone(),
721 )
722 .await?;
723 if additional_invoices.peek().is_some() {
724 fedimint_core::task::sleep(ln_payment_sleep).await;
726 }
727 }
728 }
729 Ok(())
730}
731
732#[allow(clippy::too_many_arguments)]
733async fn run_ln_circular_load_test(
734 archive_dir: Option<PathBuf>,
735 users: u16,
736 invite_code: Option<InviteCode>,
737 initial_notes: Option<OOBNotes>,
738 test_duration: Duration,
739 ln_payment_sleep: Duration,
740 notes_per_user: u16,
741 note_denomination: Amount,
742 invoice_amount: Amount,
743 strategy: LnCircularStrategy,
744 event_sender: mpsc::UnboundedSender<MetricEvent>,
745) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
746 let db_path = get_db_path(&archive_dir);
747 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
748 let minimum_notes = notes_per_user * users;
749 let minimum_amount_required = note_denomination * u64::from(minimum_notes);
750
751 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
752 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
753
754 info!(
755 "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
756 );
757 remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
758
759 print_coordinator_notes(&coordinator).await?;
760
761 let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?;
762
763 let mut users_notes =
764 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
765
766 info!("Starting user tasks");
767 let futures = users_clients
768 .into_iter()
769 .enumerate()
770 .map(|(u, client)| {
771 let u = u as u16;
772 let oob_notes = users_notes.remove(&u).unwrap();
773 let event_sender = event_sender.clone();
774 let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task(
775 format!("User {u}:"),
776 client,
777 invite_code.clone(),
778 oob_notes,
779 test_duration,
780 ln_payment_sleep,
781 invoice_amount,
782 strategy,
783 event_sender,
784 ));
785 f
786 })
787 .collect::<Vec<_>>();
788
789 Ok(futures)
790}
791
792#[allow(clippy::too_many_arguments)]
793async fn do_ln_circular_test_user_task(
794 prefix: String,
795 client: ClientHandleArc,
796 invite_code: Option<InviteCode>,
797 oob_notes: Vec<OOBNotes>,
798 test_duration: Duration,
799 ln_payment_sleep: Duration,
800 invoice_amount: Amount,
801 strategy: LnCircularStrategy,
802 event_sender: mpsc::UnboundedSender<MetricEvent>,
803) -> anyhow::Result<()> {
804 for oob_note in oob_notes {
805 let amount = oob_note.total_amount();
806 reissue_notes(&client, oob_note, &event_sender)
807 .await
808 .map_err(|e| anyhow::anyhow!("while reissuing initial {amount}: {e}"))?;
809 }
810 let initial_time = fedimint_core::time::now();
811 let still_ontime = || async {
812 fedimint_core::time::now()
813 .duration_since(initial_time)
814 .expect("time to work")
815 <= test_duration
816 };
817 let sleep_a_bit = || async {
818 if still_ontime().await {
819 fedimint_core::task::sleep(ln_payment_sleep).await;
820 }
821 };
822 match strategy {
823 LnCircularStrategy::TwoGateways => {
824 let invoice_generation = LnInvoiceGeneration::LdkLightningCli;
825 while still_ontime().await {
826 let gateway_id = get_gateway_id(invoice_generation).await?;
827 let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await;
828 run_two_gateways_strategy(
829 &prefix,
830 &invoice_generation,
831 &invoice_amount,
832 &event_sender,
833 &client,
834 ln_gateway,
835 )
836 .await?;
837 sleep_a_bit().await;
838 }
839 }
840 LnCircularStrategy::SelfPayment => {
841 while still_ontime().await {
842 do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?;
843 sleep_a_bit().await;
844 }
845 }
846 LnCircularStrategy::PartnerPingPong => {
847 let (partner, _) = build_client(invite_code, None).await?;
848 while still_ontime().await {
849 do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender)
850 .await?;
851 sleep_a_bit().await;
852 }
853 }
854 }
855 Ok(())
856}
857
858const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice";
859
860async fn run_two_gateways_strategy(
861 prefix: &str,
862 invoice_generation: &LnInvoiceGeneration,
863 invoice_amount: &Amount,
864 event_sender: &mpsc::UnboundedSender<MetricEvent>,
865 client: &ClientHandleArc,
866 ln_gateway: Option<LightningGateway>,
867) -> Result<(), anyhow::Error> {
868 let create_invoice_time = fedimint_core::time::now();
869 match *invoice_generation {
870 LnInvoiceGeneration::LdkLightningCli => {
871 let invoice = ldk_create_invoice(*invoice_amount).await?;
872 let elapsed = create_invoice_time.elapsed()?;
873 info!("Created invoice using LDK in {elapsed:?}");
874 event_sender.send(MetricEvent {
875 name: GATEWAY_CREATE_INVOICE.into(),
876 duration: elapsed,
877 })?;
878 gateway_pay_invoice(
879 prefix,
880 "LND",
881 client,
882 invoice.clone(),
883 event_sender,
884 ln_gateway.clone(),
885 )
886 .await?;
887 ldk_wait_invoice_payment(&invoice).await?;
888 let (operation_id, invoice) =
889 client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?;
890 let pay_invoice_time = fedimint_core::time::now();
891 ldk_pay_invoice(invoice).await?;
892 wait_invoice_payment(
893 prefix,
894 "LND",
895 client,
896 operation_id,
897 event_sender,
898 pay_invoice_time,
899 )
900 .await?;
901 }
902 }
903 Ok(())
904}
905
906async fn do_self_payment(
907 prefix: &str,
908 client: &ClientHandleArc,
909 invoice_amount: Amount,
910 event_sender: &mpsc::UnboundedSender<MetricEvent>,
911) -> anyhow::Result<()> {
912 let (operation_id, invoice) =
913 client_create_invoice(client, invoice_amount, event_sender, None).await?;
914 let pay_invoice_time = fedimint_core::time::now();
915 let lightning_module = client.get_first_module::<LightningClientModule>()?;
916 lightning_module
918 .pay_bolt11_invoice(None, invoice, ())
919 .await?;
920 wait_invoice_payment(
921 prefix,
922 "gateway",
923 client,
924 operation_id,
925 event_sender,
926 pay_invoice_time,
927 )
928 .await?;
929 Ok(())
930}
931
932async fn do_partner_ping_pong(
933 prefix: &str,
934 client: &ClientHandleArc,
935 partner: &ClientHandleArc,
936 invoice_amount: Amount,
937 event_sender: &mpsc::UnboundedSender<MetricEvent>,
938) -> anyhow::Result<()> {
939 let (operation_id, invoice) =
941 client_create_invoice(partner, invoice_amount, event_sender, None).await?;
942 let pay_invoice_time = fedimint_core::time::now();
943 let lightning_module = client.get_first_module::<LightningClientModule>()?;
944 lightning_module
947 .pay_bolt11_invoice(None, invoice, ())
948 .await?;
949 wait_invoice_payment(
950 prefix,
951 "gateway",
952 partner,
953 operation_id,
954 event_sender,
955 pay_invoice_time,
956 )
957 .await?;
958 let (operation_id, invoice) =
960 client_create_invoice(client, invoice_amount, event_sender, None).await?;
961 let pay_invoice_time = fedimint_core::time::now();
962 let partner_lightning_module = partner.get_first_module::<LightningClientModule>()?;
963 partner_lightning_module
966 .pay_bolt11_invoice(None, invoice, ())
967 .await?;
968 wait_invoice_payment(
969 prefix,
970 "gateway",
971 client,
972 operation_id,
973 event_sender,
974 pay_invoice_time,
975 )
976 .await?;
977 Ok(())
978}
979
980async fn wait_invoice_payment(
981 prefix: &str,
982 gateway_name: &str,
983 client: &ClientHandleArc,
984 operation_id: fedimint_core::core::OperationId,
985 event_sender: &mpsc::UnboundedSender<MetricEvent>,
986 pay_invoice_time: std::time::SystemTime,
987) -> anyhow::Result<()> {
988 let elapsed = pay_invoice_time.elapsed()?;
989 info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}");
990 event_sender.send(MetricEvent {
991 name: format!("gateway_{gateway_name}_payment_received_started"),
992 duration: elapsed,
993 })?;
994 let lightning_module = client.get_first_module::<LightningClientModule>()?;
995 let mut updates = lightning_module
996 .subscribe_ln_receive(operation_id)
997 .await?
998 .into_stream();
999 while let Some(update) = updates.next().await {
1000 debug!(%prefix, ?update, "Invoice payment update");
1001 match update {
1002 LnReceiveState::Claimed => {
1003 let elapsed: Duration = pay_invoice_time.elapsed()?;
1004 info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}");
1005 event_sender.send(MetricEvent {
1006 name: "gateway_payment_received_success".into(),
1007 duration: elapsed,
1008 })?;
1009 event_sender.send(MetricEvent {
1010 name: format!("gateway_{gateway_name}_payment_received_success"),
1011 duration: elapsed,
1012 })?;
1013 break;
1014 }
1015 LnReceiveState::Canceled { reason } => {
1016 let elapsed: Duration = pay_invoice_time.elapsed()?;
1017 info!(
1018 "{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"
1019 );
1020 event_sender.send(MetricEvent {
1021 name: "gateway_payment_received_canceled".into(),
1022 duration: elapsed,
1023 })?;
1024 break;
1025 }
1026 _ => {}
1027 }
1028 }
1029 Ok(())
1030}
1031
1032async fn client_create_invoice(
1033 client: &ClientHandleArc,
1034 invoice_amount: Amount,
1035 event_sender: &mpsc::UnboundedSender<MetricEvent>,
1036 ln_gateway: Option<LightningGateway>,
1037) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> {
1038 let create_invoice_time = fedimint_core::time::now();
1039 let lightning_module = client.get_first_module::<LightningClientModule>()?;
1040 let desc = Description::new("test".to_string())?;
1041 let (operation_id, invoice, _) = lightning_module
1042 .create_bolt11_invoice(
1043 invoice_amount,
1044 Bolt11InvoiceDescription::Direct(desc),
1045 None,
1046 (),
1047 ln_gateway,
1048 )
1049 .await?;
1050 let elapsed = create_invoice_time.elapsed()?;
1051 info!("Created invoice using gateway in {elapsed:?}");
1052 event_sender.send(MetricEvent {
1053 name: GATEWAY_CREATE_INVOICE.into(),
1054 duration: elapsed,
1055 })?;
1056 Ok((operation_id, invoice))
1057}
1058
1059fn test_download_config(
1060 invite_code: &InviteCode,
1061 users: u16,
1062 event_sender: &mpsc::UnboundedSender<MetricEvent>,
1063) -> Vec<BoxFuture<'static, anyhow::Result<()>>> {
1064 (0..users)
1065 .map(|_| {
1066 let invite_code = invite_code.clone();
1067 let event_sender = event_sender.clone();
1068 let f: BoxFuture<_> = Box::pin(async move {
1069 let m = fedimint_core::time::now();
1070 let _ = fedimint_api_client::api::net::Connector::default()
1071 .download_from_invite_code(&invite_code)
1072 .await?;
1073 event_sender.send(MetricEvent {
1074 name: "download_client_config".into(),
1075 duration: m.elapsed()?,
1076 })?;
1077 Ok(())
1078 });
1079 f
1080 })
1081 .collect()
1082}
1083
1084async fn test_connect_raw_client(
1085 invite_code: InviteCode,
1086 users: u16,
1087 duration: Duration,
1088 timeout: Duration,
1089 limit_endpoints: Option<usize>,
1090 event_sender: mpsc::UnboundedSender<MetricEvent>,
1091) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
1092 use jsonrpsee_core::client::ClientT;
1093 use jsonrpsee_ws_client::WsClientBuilder;
1094
1095 let mut cfg = fedimint_api_client::api::net::Connector::default()
1096 .download_from_invite_code(&invite_code)
1097 .await?;
1098
1099 if let Some(limit_endpoints) = limit_endpoints {
1100 cfg.global.api_endpoints = cfg
1101 .global
1102 .api_endpoints
1103 .into_iter()
1104 .take(limit_endpoints)
1105 .collect();
1106 info!("Limiting endpoints to {:?}", cfg.global.api_endpoints);
1107 }
1108
1109 info!("Connecting to {users} clients");
1110 let clients = (0..users)
1111 .flat_map(|_| {
1112 cfg.global.api_endpoints.values().map(|url| async {
1113 let ws_client = WsClientBuilder::default()
1114 .request_timeout(timeout)
1115 .connection_timeout(timeout)
1116 .build(url_to_string_with_default_port(&url.url))
1117 .await?;
1118 Ok::<_, anyhow::Error>(ws_client)
1119 })
1120 })
1121 .collect::<Vec<_>>();
1122 let clients = futures::future::try_join_all(clients).await?;
1123 info!("Keeping {users} clients connected for {duration:?}");
1124 Ok(clients
1125 .into_iter()
1126 .map(|client| {
1127 let event_sender = event_sender.clone();
1128 let f: BoxFuture<_> = Box::pin(async move {
1129 let initial_time = fedimint_core::time::now();
1130 while initial_time.elapsed()? < duration {
1131 let m = fedimint_core::time::now();
1132 let _epoch: u64 = client
1133 .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()])
1134 .await?;
1135 event_sender.send(MetricEvent {
1136 name: SESSION_COUNT_ENDPOINT.into(),
1137 duration: m.elapsed()?,
1138 })?;
1139 fedimint_core::task::sleep(Duration::from_secs(1)).await;
1140 }
1141 Ok(())
1142 });
1143 f
1144 })
1145 .collect())
1146}
1147
1148fn url_to_string_with_default_port(url: &SafeUrl) -> String {
1149 format!(
1150 "{}://{}:{}{}",
1151 url.scheme(),
1152 url.host().expect("Asserted on construction"),
1153 url.port_or_known_default()
1154 .expect("Asserted on construction"),
1155 url.path()
1156 )
1157}
1158
1159async fn handle_metrics_summary(
1160 opts: Opts,
1161 mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>,
1162) -> anyhow::Result<()> {
1163 let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs();
1164 let mut metrics_json_output_files = vec![];
1165 let mut previous_metrics = vec![];
1166 let mut comparison_output = None;
1167 if let Some(archive_dir) = opts.archive_dir {
1168 let mut archive_metrics = archive_dir.join("metrics");
1169 archive_metrics.push(opts.users.to_string());
1170 tokio::fs::create_dir_all(&archive_metrics).await?;
1171 let mut archive_comparisons = archive_dir.join("comparisons");
1172 archive_comparisons.push(opts.users.to_string());
1173 tokio::fs::create_dir_all(&archive_comparisons).await?;
1174
1175 let latest_metrics_file = std::fs::read_dir(&archive_metrics)?
1176 .map(|entry| {
1177 let entry = entry.unwrap();
1178 let metadata = entry.metadata().unwrap();
1179 let created = metadata
1180 .created()
1181 .unwrap_or_else(|_| metadata.modified().unwrap());
1182 (entry, created)
1183 })
1184 .max_by_key(|(_entry, created)| created.to_owned())
1185 .map(|(entry, _)| entry.path());
1186 if let Some(latest_metrics_file) = latest_metrics_file {
1187 let display_path = latest_metrics_file.display();
1188 let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file)
1189 .await
1190 .with_context(|| format!("Failed to open {display_path}"))?;
1191 let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines();
1192 while let Some(line) = lines.next_line().await? {
1193 match serde_json::from_str::<EventMetricSummary>(&line) {
1194 Ok(metric) => {
1195 previous_metrics.push(metric);
1196 }
1197 Err(e) => {
1198 warn!("Failed to parse previous metric: {e:?}");
1199 }
1200 }
1201 }
1202 }
1203 let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",));
1204 let new_metric_output = BufWriter::new(
1205 OpenOptions::new()
1206 .write(true)
1207 .create(true)
1208 .truncate(true)
1209 .open(new_metric_output)
1210 .await?,
1211 );
1212 metrics_json_output_files.push(new_metric_output);
1213 if !previous_metrics.is_empty() {
1214 let new_comparison_output =
1215 archive_comparisons.join(format!("{timestamp_seconds}.json",));
1216 comparison_output = Some(BufWriter::new(
1217 OpenOptions::new()
1218 .write(true)
1219 .create(true)
1220 .truncate(true)
1221 .open(new_comparison_output)
1222 .await?,
1223 ));
1224 }
1225 }
1226 if let Some(metrics_json_output) = opts.metrics_json_output {
1227 metrics_json_output_files.push(BufWriter::new(
1228 tokio::fs::OpenOptions::new()
1229 .write(true)
1230 .create(true)
1231 .truncate(true)
1232 .open(metrics_json_output)
1233 .await?,
1234 ));
1235 }
1236 let mut results = BTreeMap::new();
1237 while let Some(event) = event_receiver.recv().await {
1238 let entry = results.entry(event.name).or_insert_with(Vec::new);
1239 entry.push(event.duration);
1240 }
1241 let mut previous_metrics = previous_metrics
1242 .into_iter()
1243 .map(|metric| (metric.name.clone(), metric))
1244 .collect::<HashMap<_, _>>();
1245 for (k, mut v) in results {
1246 v.sort();
1247 let n = v.len();
1248 let max = v.iter().last().unwrap();
1249 let min = v.first().unwrap();
1250 let median = v[n / 2];
1251 let sum: Duration = v.iter().sum();
1252 let avg = sum / n as u32;
1253 let metric_summary = EventMetricSummary {
1254 name: k.clone(),
1255 users: u64::from(opts.users),
1256 n: n as u64,
1257 avg_ms: avg.as_millis(),
1258 median_ms: median.as_millis(),
1259 max_ms: max.as_millis(),
1260 min_ms: min.as_millis(),
1261 timestamp_seconds,
1262 };
1263 let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) {
1264 if previous_metric.n == metric_summary.n {
1265 fn calculate_gain(current: u128, previous: u128) -> f64 {
1266 current as f64 / previous as f64
1267 }
1268 let comparison = EventMetricComparison {
1269 avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms),
1270 median_ms_gain: calculate_gain(
1271 metric_summary.median_ms,
1272 previous_metric.median_ms,
1273 ),
1274 max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms),
1275 min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms),
1276 current: metric_summary.clone(),
1277 previous: previous_metric,
1278 };
1279 if let Some(comparison_output) = &mut comparison_output {
1280 let comparison_json =
1281 serde_json::to_string(&comparison).expect("to be serializable");
1282 comparison_output
1283 .write_all(format!("{comparison_json}\n").as_bytes())
1284 .await
1285 .expect("to write on file");
1286 }
1287 Some(comparison)
1288 } else {
1289 info!(
1290 "Skipping comparison for {k} because previous metric has different n ({} vs {})",
1291 previous_metric.n, metric_summary.n
1292 );
1293 None
1294 }
1295 } else {
1296 None
1297 };
1298 if let Some(comparison) = comparison {
1299 println!(
1300 "{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"
1301 );
1302 } else {
1303 println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}");
1304 }
1305 let metric_summary_json =
1306 serde_json::to_string(&metric_summary).expect("to be serializable");
1307 for metrics_json_output_file in &mut metrics_json_output_files {
1308 metrics_json_output_file
1309 .write_all(format!("{metric_summary_json}\n").as_bytes())
1310 .await
1311 .expect("to write on file");
1312 }
1313 }
1314 for mut output in metrics_json_output_files {
1315 output.flush().await?;
1316 }
1317 if let Some(mut output) = comparison_output {
1318 output.flush().await?;
1319 }
1320 Ok(())
1321}
1322
1323async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> {
1324 let gateway_json = match generate_invoice_with {
1325 LnInvoiceGeneration::LdkLightningCli => {
1326 cmd!(GatewayLndCli, "info").out_json().await
1328 }
1329 }?;
1330 let gateway_id = gateway_json["gateway_id"]
1331 .as_str()
1332 .context("Missing gateway_id field")?;
1333
1334 Ok(gateway_id.into())
1335}