1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_precision_loss)]
4#![allow(clippy::missing_errors_doc)]
5#![allow(clippy::ref_option)]
6#![allow(clippy::too_many_lines)]
7#![allow(clippy::large_futures)]
8
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::time::Duration;
13use std::vec;
14
15use anyhow::{Context, bail};
16use clap::{Args, Parser, Subcommand, ValueEnum};
17use common::{
18 gateway_pay_invoice, get_note_summary, ldk_create_invoice, ldk_pay_invoice,
19 ldk_wait_invoice_payment, parse_gateway_id, reissue_notes,
20};
21use devimint::cmd;
22use devimint::util::GatewayLndCli;
23use fedimint_client::ClientHandleArc;
24use fedimint_core::Amount;
25use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
26use fedimint_core::invite_code::InviteCode;
27use fedimint_core::module::ApiRequestErased;
28use fedimint_core::runtime::spawn;
29use fedimint_core::util::{BoxFuture, SafeUrl};
30use fedimint_ln_client::{LightningClientModule, LnReceiveState};
31use fedimint_ln_common::LightningGateway;
32use fedimint_mint_client::OOBNotes;
33use futures::StreamExt;
34use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
35use serde::{Deserialize, Serialize};
36use tokio::fs::OpenOptions;
37use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
38use tokio::sync::mpsc;
39use tracing::{debug, info, warn};
40
41use crate::common::{
42 build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli,
43};
44pub mod common;
45
46#[derive(Parser, Clone)]
47#[command(version)]
48struct Opts {
49 #[arg(
50 long,
51 default_value = "10",
52 help = "Number of users. Each user will work in parallel"
53 )]
54 users: u16,
55
56 #[arg(long, help = "Output with the metrics results in JSON format")]
57 metrics_json_output: Option<PathBuf>,
58
59 #[arg(
60 long,
61 help = "If given, will be used to store and retrieve past metrics for comparison purposes"
62 )]
63 archive_dir: Option<PathBuf>,
64
65 #[clap(subcommand)]
66 command: Command,
67}
68
69#[derive(Debug, Clone, Copy, ValueEnum)]
70enum LnInvoiceGeneration {
71 LdkLightningCli,
72}
73
74#[derive(Subcommand, Clone)]
75enum Command {
76 #[command(about = "Keep many websocket connections to a federation for a duration of time")]
77 TestConnect {
78 #[arg(long, help = "Federation invite code")]
79 invite_code: String,
80 #[arg(
81 long,
82 default_value = "60",
83 help = "How much time to keep the connections open, in seconds"
84 )]
85 duration_secs: u64,
86 #[arg(
87 long,
88 default_value = "120",
89 help = "Timeout for connection attempt and for each request, in secnods"
90 )]
91 timeout_secs: u64,
92 #[arg(
93 long,
94 help = "If given, will limit the number of endpoints (guardians) to connect to"
95 )]
96 limit_endpoints: Option<usize>,
97 },
98 #[command(about = "Try to download the client config many times.")]
99 TestDownload {
100 #[arg(long, help = "Federation invite code")]
101 invite_code: String,
102 },
103 #[command(
104 about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway"
105 )]
106 LoadTest(LoadTestArgs),
107 #[command()]
112 LnCircularLoadTest(LnCircularLoadTestArgs),
113}
114
115#[derive(Args, Clone)]
116struct LoadTestArgs {
117 #[arg(
118 long,
119 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
120 )]
121 invite_code: Option<InviteCode>,
122
123 #[arg(
124 long,
125 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
126 )]
127 initial_notes: Option<OOBNotes>,
128
129 #[arg(
130 long,
131 help = "Gateway Id. If none, retrieve one according to --generate-invoice-with"
132 )]
133 gateway_id: Option<String>,
134
135 #[arg(
136 long,
137 help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)"
138 )]
139 generate_invoice_with: Option<LnInvoiceGeneration>,
140
141 #[arg(
142 long,
143 default_value = "1",
144 help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided"
145 )]
146 invoices_per_user: u16,
147
148 #[arg(
149 long,
150 default_value = "0",
151 help = "How many seconds to sleep between LN payments"
152 )]
153 ln_payment_sleep_secs: u64,
154
155 #[arg(
156 long,
157 help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid"
158 )]
159 invoices_file: Option<PathBuf>,
160
161 #[arg(
162 long,
163 help = "How many notes to distribute to each user",
164 default_value = "2"
165 )]
166 notes_per_user: u16,
167
168 #[arg(
169 long,
170 help = "Note denomination to use for the test",
171 default_value = "1024"
172 )]
173 note_denomination: Amount,
174
175 #[arg(
176 long,
177 help = "Invoice amount when generating one",
178 default_value = "1000"
179 )]
180 invoice_amount: Amount,
181}
182
183#[derive(Args, Clone)]
184struct LnCircularLoadTestArgs {
185 #[arg(
186 long,
187 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
188 )]
189 invite_code: Option<InviteCode>,
190
191 #[arg(
192 long,
193 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
194 )]
195 initial_notes: Option<OOBNotes>,
196
197 #[arg(
198 long,
199 default_value = "60",
200 help = "For how many seconds to run the test"
201 )]
202 test_duration_secs: u64,
203
204 #[arg(
205 long,
206 default_value = "0",
207 help = "How many seconds to sleep between LN payments"
208 )]
209 ln_payment_sleep_secs: u64,
210
211 #[arg(
212 long,
213 help = "How many notes to distribute to each user",
214 default_value = "1"
215 )]
216 notes_per_user: u16,
217
218 #[arg(
219 long,
220 help = "Note denomination to use for the test",
221 default_value = "1024"
222 )]
223 note_denomination: Amount,
224
225 #[arg(
226 long,
227 help = "Invoice amount when generating one",
228 default_value = "1000"
229 )]
230 invoice_amount: Amount,
231
232 #[arg(long)]
233 strategy: LnCircularStrategy,
234}
235
236#[derive(Debug, Clone, Copy, ValueEnum)]
237enum LnCircularStrategy {
238 SelfPayment,
240 TwoGateways,
243 PartnerPingPong,
245}
246
247#[derive(Debug, Clone)]
248pub struct MetricEvent {
249 name: String,
250 duration: Duration,
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
254struct EventMetricSummary {
255 name: String,
256 users: u64,
257 n: u64,
258 avg_ms: u128,
259 median_ms: u128,
260 max_ms: u128,
261 min_ms: u128,
262 timestamp_seconds: u64,
263}
264
265#[derive(Debug, Serialize, Deserialize)]
266struct EventMetricComparison {
267 avg_ms_gain: f64,
268 median_ms_gain: f64,
269 max_ms_gain: f64,
270 min_ms_gain: f64,
271 current: EventMetricSummary,
272 previous: EventMetricSummary,
273}
274
275impl std::fmt::Display for EventMetricComparison {
276 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277 fn to_percent(gain: f64) -> String {
278 if gain >= 1.0 {
279 format!("+{:.2}%", (gain - 1.0) * 100.0)
280 } else {
281 format!("-{:.2}%", (1.0 - gain) * 100.0)
282 }
283 }
284 f.write_str(&format!(
285 "avg: {}, median: {}, max: {}, min: {}",
286 to_percent(self.avg_ms_gain),
287 to_percent(self.median_ms_gain),
288 to_percent(self.max_ms_gain),
289 to_percent(self.min_ms_gain),
290 ))
291 }
292}
293
294#[tokio::main]
295async fn main() -> anyhow::Result<()> {
296 fedimint_logging::TracingSetup::default().init()?;
297 let opts = Opts::parse();
298 let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
299 let summary_handle = spawn("handle metrics summary", {
300 let opts = opts.clone();
301 async { handle_metrics_summary(opts, event_receiver).await }
302 });
303 let futures = match opts.command.clone() {
304 Command::TestConnect {
305 invite_code,
306 duration_secs,
307 timeout_secs,
308 limit_endpoints,
309 } => {
310 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
311 test_connect_raw_client(
312 invite_code,
313 opts.users,
314 Duration::from_secs(duration_secs),
315 Duration::from_secs(timeout_secs),
316 limit_endpoints,
317 event_sender.clone(),
318 )
319 .await?
320 }
321 Command::TestDownload { invite_code } => {
322 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
323 test_download_config(&invite_code, opts.users, &event_sender.clone())
324 }
325 Command::LoadTest(args) => {
326 let invite_code = invite_code_or_fallback(args.invite_code).await;
327
328 let gateway_id = if let Some(gateway_id) = args.gateway_id {
329 Some(gateway_id)
330 } else if let Some(generate_invoice_with) = args.generate_invoice_with {
331 Some(get_gateway_id(generate_invoice_with).await?)
332 } else {
333 None
334 };
335 let invoices = if let Some(invoices_file) = args.invoices_file {
336 let invoices_file = tokio::fs::File::open(&invoices_file)
337 .await
338 .with_context(|| format!("Failed to open {invoices_file:?}"))?;
339 let mut lines = tokio::io::BufReader::new(invoices_file).lines();
340 let mut invoices = vec![];
341 while let Some(line) = lines.next_line().await? {
342 let invoice = Bolt11Invoice::from_str(&line)?;
343 invoices.push(invoice);
344 }
345 invoices
346 } else {
347 vec![]
348 };
349 if args.generate_invoice_with.is_none() && invoices.is_empty() {
350 info!(
351 "No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run"
352 );
353 }
354 run_load_test(
355 opts.archive_dir,
356 opts.users,
357 invite_code,
358 args.initial_notes,
359 args.generate_invoice_with,
360 args.invoices_per_user,
361 Duration::from_secs(args.ln_payment_sleep_secs),
362 invoices,
363 gateway_id,
364 args.notes_per_user,
365 args.note_denomination,
366 args.invoice_amount,
367 event_sender.clone(),
368 )
369 .await?
370 }
371 Command::LnCircularLoadTest(args) => {
372 let invite_code = invite_code_or_fallback(args.invite_code).await;
373 run_ln_circular_load_test(
374 opts.archive_dir,
375 opts.users,
376 invite_code,
377 args.initial_notes,
378 Duration::from_secs(args.test_duration_secs),
379 Duration::from_secs(args.ln_payment_sleep_secs),
380 args.notes_per_user,
381 args.note_denomination,
382 args.invoice_amount,
383 args.strategy,
384 event_sender.clone(),
385 )
386 .await?
387 }
388 };
389
390 let result = futures::future::join_all(futures).await;
391 drop(event_sender);
392 summary_handle.await??;
393 let len_failures = result.iter().filter(|r| r.is_err()).count();
394 eprintln!("{} results, {len_failures} failures", result.len());
395 for r in result {
396 if let Err(e) = r {
397 warn!("Task failed: {:?}", e);
398 }
399 }
400 if len_failures > 0 {
401 bail!("Finished with failures");
402 }
403 info!("Finished successfully");
404 Ok(())
405}
406
407async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> {
408 if let Some(invite_code) = invite_code {
409 Some(invite_code)
410 } else {
411 match get_invite_code_cli(0.into()).await {
413 Ok(invite_code) => Some(invite_code),
414 Err(e) => {
415 info!(
416 "No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."
417 );
418 None
419 }
420 }
421 }
422}
423
424#[allow(clippy::too_many_arguments)]
425async fn run_load_test(
426 archive_dir: Option<PathBuf>,
427 users: u16,
428 invite_code: Option<InviteCode>,
429 initial_notes: Option<OOBNotes>,
430 generate_invoice_with: Option<LnInvoiceGeneration>,
431 generated_invoices_per_user: u16,
432 ln_payment_sleep: Duration,
433 invoices_from_file: Vec<Bolt11Invoice>,
434 gateway_id: Option<String>,
435 notes_per_user: u16,
436 note_denomination: Amount,
437 invoice_amount: Amount,
438 event_sender: mpsc::UnboundedSender<MetricEvent>,
439) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
440 let db_path = get_db_path(&archive_dir);
441 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
442 let minimum_notes = notes_per_user * users;
443 let minimum_amount_required = note_denomination * u64::from(minimum_notes);
444
445 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
446 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
447 print_coordinator_notes(&coordinator).await?;
448 info!(
449 "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
450 );
451 remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
452 print_coordinator_notes(&coordinator).await?;
453
454 let users_clients = get_users_clients(users, db_path, invite_code).await?;
455
456 let mut users_notes =
457 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
458 let mut users_invoices = HashMap::new();
459 let mut user = 0;
460 for invoice in invoices_from_file {
462 users_invoices
463 .entry(user)
464 .or_insert_with(Vec::new)
465 .push(invoice);
466 user = (user + 1) % users;
467 }
468
469 info!("Starting user tasks");
470 let futures = users_clients
471 .into_iter()
472 .enumerate()
473 .map(|(u, client)| {
474 let u = u as u16;
475 let oob_notes = users_notes.remove(&u).unwrap();
476 let invoices = users_invoices.remove(&u).unwrap_or_default();
477 let event_sender = event_sender.clone();
478 let f: BoxFuture<_> = Box::pin(do_load_test_user_task(
479 format!("User {u}:"),
480 client,
481 oob_notes,
482 generated_invoices_per_user,
483 ln_payment_sleep,
484 invoice_amount,
485 invoices,
486 generate_invoice_with,
487 event_sender,
488 gateway_id.clone(),
489 ));
490 f
491 })
492 .collect::<Vec<_>>();
493
494 Ok(futures)
495}
496
497async fn get_notes_for_users(
498 users: u16,
499 notes_per_user: u16,
500 coordinator: ClientHandleArc,
501 note_denomination: Amount,
502) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> {
503 let mut users_notes = HashMap::new();
504 for u in 0..users {
505 users_notes.insert(u, Vec::with_capacity(notes_per_user.into()));
506 for _ in 0..notes_per_user {
507 let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?;
508 let user_amount = oob_notes.total_amount();
509 info!("Giving {user_amount} to user {u}");
510 users_notes.get_mut(&u).unwrap().push(oob_notes);
511 }
512 }
513 Ok(users_notes)
514}
515
516async fn get_users_clients(
517 n: u16,
518 db_path: Option<PathBuf>,
519 invite_code: Option<InviteCode>,
520) -> anyhow::Result<Vec<ClientHandleArc>> {
521 let mut users_clients = Vec::with_capacity(n.into());
522 for u in 0..n {
523 let (client, _) = get_user_client(u, &db_path, &invite_code).await?;
524 users_clients.push(client);
525 }
526 Ok(users_clients)
527}
528
529async fn get_user_client(
530 user_index: u16,
531 db_path: &Option<PathBuf>,
532 invite_code: &Option<InviteCode>,
533) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
534 let user_db = db_path
535 .as_ref()
536 .map(|db_path| db_path.join(format!("user_{user_index}.db")));
537 let user_invite_code = if user_db.as_ref().is_some_and(|db| db.exists()) {
538 None
539 } else {
540 invite_code.clone()
541 };
542 let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?;
543 if let Ok(ln_client) = client.get_first_module::<LightningClientModule>() {
545 let _ = ln_client.update_gateway_cache().await;
546 }
547 Ok((client, invite_code))
548}
549
550async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> {
551 info!("Note summary:");
552 let summary = get_note_summary(coordinator).await?;
553 for (k, v) in summary.iter() {
554 info!("{k}: {v}");
555 }
556 Ok(())
557}
558
559async fn get_required_notes(
560 coordinator: &ClientHandleArc,
561 minimum_amount_required: Amount,
562 event_sender: &mpsc::UnboundedSender<MetricEvent>,
563) -> anyhow::Result<()> {
564 let current_balance = coordinator.get_balance().await;
565 if current_balance < minimum_amount_required {
566 let diff = minimum_amount_required.saturating_sub(current_balance);
567 info!(
568 "Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"
569 );
570 match try_get_notes_cli(&diff, 5).await {
571 Ok(notes) => {
572 info!("Got {} more notes, reissuing them", notes.total_amount());
573 reissue_notes(coordinator, notes, event_sender).await?;
574 }
575 Err(e) => {
576 info!("Unable to get more notes: '{e}', will try to proceed without them");
577 }
578 };
579 } else {
580 info!(
581 "Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"
582 );
583 }
584 Ok(())
585}
586
587async fn reissue_initial_notes(
588 initial_notes: Option<OOBNotes>,
589 coordinator: &ClientHandleArc,
590 event_sender: &mpsc::UnboundedSender<MetricEvent>,
591) -> anyhow::Result<()> {
592 if let Some(notes) = initial_notes {
593 let amount = notes.total_amount();
594 info!("Reissuing initial notes, got {amount}");
595 reissue_notes(coordinator, notes, event_sender).await?;
596 }
597 Ok(())
598}
599
600async fn get_coordinator_client(
601 db_path: &Option<PathBuf>,
602 invite_code: &Option<InviteCode>,
603) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
604 let (client, invite_code) = if let Some(db_path) = db_path {
605 let coordinator_db = db_path.join("coordinator.db");
606 if coordinator_db.exists() {
607 build_client(invite_code.clone(), Some(&coordinator_db)).await?
608 } else {
609 tokio::fs::create_dir_all(db_path).await?;
610 build_client(
611 Some(invite_code.clone().context(
612 "Running on this archive dir for the first time, an invite code is required",
613 )?),
614 Some(&coordinator_db),
615 )
616 .await?
617 }
618 } else {
619 build_client(
620 Some(
621 invite_code
622 .clone()
623 .context("No archive dir given, an invite code is strictly required")?,
624 ),
625 None,
626 )
627 .await?
628 };
629 Ok((client, invite_code))
630}
631
632fn get_db_path(archive_dir: &Option<PathBuf>) -> Option<PathBuf> {
633 archive_dir.as_ref().map(|p| p.join("db"))
634}
635
636async fn get_lightning_gateway(
637 client: &ClientHandleArc,
638 gateway_id: Option<String>,
639) -> Option<LightningGateway> {
640 let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id");
641 let ln_module = client
642 .get_first_module::<LightningClientModule>()
643 .expect("Must have ln client module");
644 ln_module.select_gateway(&gateway_id).await
645}
646
647#[allow(clippy::too_many_arguments)]
648async fn do_load_test_user_task(
649 prefix: String,
650 client: ClientHandleArc,
651 oob_notes: Vec<OOBNotes>,
652 generated_invoices_per_user: u16,
653 ln_payment_sleep: Duration,
654 invoice_amount: Amount,
655 additional_invoices: Vec<Bolt11Invoice>,
656 generate_invoice_with: Option<LnInvoiceGeneration>,
657 event_sender: mpsc::UnboundedSender<MetricEvent>,
658 gateway_id: Option<String>,
659) -> anyhow::Result<()> {
660 let ln_gateway = get_lightning_gateway(&client, gateway_id).await;
661 for oob_note in oob_notes {
662 let amount = oob_note.total_amount();
663 reissue_notes(&client, oob_note, &event_sender)
664 .await
665 .map_err(|e| anyhow::anyhow!("while reissuing initial {amount}: {e}"))?;
666 }
667 let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable();
668 while let Some(_) = generated_invoices_per_user_iterator.next() {
669 let total_amount = get_note_summary(&client).await?.total_amount();
670 if invoice_amount > total_amount {
671 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
672 } else {
673 match generate_invoice_with {
674 Some(LnInvoiceGeneration::LdkLightningCli) => {
675 let invoice = ldk_create_invoice(invoice_amount).await?;
676 gateway_pay_invoice(
677 &prefix,
678 "LND",
679 &client,
680 invoice.clone(),
681 &event_sender,
682 ln_gateway.clone(),
683 )
684 .await?;
685 ldk_wait_invoice_payment(&invoice).await?;
686 }
687 None if additional_invoices.is_empty() => {
688 debug!(
689 "No method given to generate an invoice and no invoices on file, will not test the gateway"
690 );
691 break;
692 }
693 None => {
694 break;
695 }
696 };
697 if generated_invoices_per_user_iterator.peek().is_some() {
698 fedimint_core::task::sleep(ln_payment_sleep).await;
700 }
701 }
702 }
703 let mut additional_invoices = additional_invoices.into_iter().peekable();
704 while let Some(invoice) = additional_invoices.next() {
705 let total_amount = get_note_summary(&client).await?.total_amount();
706 let invoice_amount =
707 Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default());
708 if invoice_amount > total_amount {
709 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
710 } else if invoice_amount == Amount::ZERO {
711 warn!("Can't pay invoice {invoice}, amount is zero");
712 } else {
713 gateway_pay_invoice(
714 &prefix,
715 "unknown",
716 &client,
717 invoice,
718 &event_sender,
719 ln_gateway.clone(),
720 )
721 .await?;
722 if additional_invoices.peek().is_some() {
723 fedimint_core::task::sleep(ln_payment_sleep).await;
725 }
726 }
727 }
728 Ok(())
729}
730
731#[allow(clippy::too_many_arguments)]
732async fn run_ln_circular_load_test(
733 archive_dir: Option<PathBuf>,
734 users: u16,
735 invite_code: Option<InviteCode>,
736 initial_notes: Option<OOBNotes>,
737 test_duration: Duration,
738 ln_payment_sleep: Duration,
739 notes_per_user: u16,
740 note_denomination: Amount,
741 invoice_amount: Amount,
742 strategy: LnCircularStrategy,
743 event_sender: mpsc::UnboundedSender<MetricEvent>,
744) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
745 let db_path = get_db_path(&archive_dir);
746 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
747 let minimum_notes = notes_per_user * users;
748 let minimum_amount_required = note_denomination * u64::from(minimum_notes);
749
750 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
751 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
752
753 info!(
754 "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
755 );
756 remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
757
758 print_coordinator_notes(&coordinator).await?;
759
760 let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?;
761
762 let mut users_notes =
763 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
764
765 info!("Starting user tasks");
766 let futures = users_clients
767 .into_iter()
768 .enumerate()
769 .map(|(u, client)| {
770 let u = u as u16;
771 let oob_notes = users_notes.remove(&u).unwrap();
772 let event_sender = event_sender.clone();
773 let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task(
774 format!("User {u}:"),
775 client,
776 invite_code.clone(),
777 oob_notes,
778 test_duration,
779 ln_payment_sleep,
780 invoice_amount,
781 strategy,
782 event_sender,
783 ));
784 f
785 })
786 .collect::<Vec<_>>();
787
788 Ok(futures)
789}
790
791#[allow(clippy::too_many_arguments)]
792async fn do_ln_circular_test_user_task(
793 prefix: String,
794 client: ClientHandleArc,
795 invite_code: Option<InviteCode>,
796 oob_notes: Vec<OOBNotes>,
797 test_duration: Duration,
798 ln_payment_sleep: Duration,
799 invoice_amount: Amount,
800 strategy: LnCircularStrategy,
801 event_sender: mpsc::UnboundedSender<MetricEvent>,
802) -> anyhow::Result<()> {
803 for oob_note in oob_notes {
804 let amount = oob_note.total_amount();
805 reissue_notes(&client, oob_note, &event_sender)
806 .await
807 .map_err(|e| anyhow::anyhow!("while reissuing initial {amount}: {e}"))?;
808 }
809 let initial_time = fedimint_core::time::now();
810 let still_ontime = || async {
811 fedimint_core::time::now()
812 .duration_since(initial_time)
813 .expect("time to work")
814 <= test_duration
815 };
816 let sleep_a_bit = || async {
817 if still_ontime().await {
818 fedimint_core::task::sleep(ln_payment_sleep).await;
819 }
820 };
821 match strategy {
822 LnCircularStrategy::TwoGateways => {
823 let invoice_generation = LnInvoiceGeneration::LdkLightningCli;
824 while still_ontime().await {
825 let gateway_id = get_gateway_id(invoice_generation).await?;
826 let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await;
827 run_two_gateways_strategy(
828 &prefix,
829 &invoice_generation,
830 &invoice_amount,
831 &event_sender,
832 &client,
833 ln_gateway,
834 )
835 .await?;
836 sleep_a_bit().await;
837 }
838 }
839 LnCircularStrategy::SelfPayment => {
840 while still_ontime().await {
841 do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?;
842 sleep_a_bit().await;
843 }
844 }
845 LnCircularStrategy::PartnerPingPong => {
846 let (partner, _) = build_client(invite_code, None).await?;
847 while still_ontime().await {
848 do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender)
849 .await?;
850 sleep_a_bit().await;
851 }
852 }
853 }
854 Ok(())
855}
856
857const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice";
858
859async fn run_two_gateways_strategy(
860 prefix: &str,
861 invoice_generation: &LnInvoiceGeneration,
862 invoice_amount: &Amount,
863 event_sender: &mpsc::UnboundedSender<MetricEvent>,
864 client: &ClientHandleArc,
865 ln_gateway: Option<LightningGateway>,
866) -> Result<(), anyhow::Error> {
867 let create_invoice_time = fedimint_core::time::now();
868 match *invoice_generation {
869 LnInvoiceGeneration::LdkLightningCli => {
870 let invoice = ldk_create_invoice(*invoice_amount).await?;
871 let elapsed = create_invoice_time.elapsed()?;
872 info!("Created invoice using CLN in {elapsed:?}");
873 event_sender.send(MetricEvent {
874 name: GATEWAY_CREATE_INVOICE.into(),
875 duration: elapsed,
876 })?;
877 gateway_pay_invoice(
878 prefix,
879 "LND",
880 client,
881 invoice.clone(),
882 event_sender,
883 ln_gateway.clone(),
884 )
885 .await?;
886 ldk_wait_invoice_payment(&invoice).await?;
887 let (operation_id, invoice) =
888 client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?;
889 let pay_invoice_time = fedimint_core::time::now();
890 ldk_pay_invoice(invoice).await?;
891 wait_invoice_payment(
892 prefix,
893 "LND",
894 client,
895 operation_id,
896 event_sender,
897 pay_invoice_time,
898 )
899 .await?;
900 }
901 };
902 Ok(())
903}
904
905async fn do_self_payment(
906 prefix: &str,
907 client: &ClientHandleArc,
908 invoice_amount: Amount,
909 event_sender: &mpsc::UnboundedSender<MetricEvent>,
910) -> anyhow::Result<()> {
911 let (operation_id, invoice) =
912 client_create_invoice(client, invoice_amount, event_sender, None).await?;
913 let pay_invoice_time = fedimint_core::time::now();
914 let lightning_module = client.get_first_module::<LightningClientModule>()?;
915 lightning_module
917 .pay_bolt11_invoice(None, invoice, ())
918 .await?;
919 wait_invoice_payment(
920 prefix,
921 "gateway",
922 client,
923 operation_id,
924 event_sender,
925 pay_invoice_time,
926 )
927 .await?;
928 Ok(())
929}
930
931async fn do_partner_ping_pong(
932 prefix: &str,
933 client: &ClientHandleArc,
934 partner: &ClientHandleArc,
935 invoice_amount: Amount,
936 event_sender: &mpsc::UnboundedSender<MetricEvent>,
937) -> anyhow::Result<()> {
938 let (operation_id, invoice) =
940 client_create_invoice(partner, invoice_amount, event_sender, None).await?;
941 let pay_invoice_time = fedimint_core::time::now();
942 let lightning_module = client.get_first_module::<LightningClientModule>()?;
943 lightning_module
946 .pay_bolt11_invoice(None, invoice, ())
947 .await?;
948 wait_invoice_payment(
949 prefix,
950 "gateway",
951 partner,
952 operation_id,
953 event_sender,
954 pay_invoice_time,
955 )
956 .await?;
957 let (operation_id, invoice) =
959 client_create_invoice(client, invoice_amount, event_sender, None).await?;
960 let pay_invoice_time = fedimint_core::time::now();
961 let partner_lightning_module = partner.get_first_module::<LightningClientModule>()?;
962 partner_lightning_module
965 .pay_bolt11_invoice(None, invoice, ())
966 .await?;
967 wait_invoice_payment(
968 prefix,
969 "gateway",
970 client,
971 operation_id,
972 event_sender,
973 pay_invoice_time,
974 )
975 .await?;
976 Ok(())
977}
978
979async fn wait_invoice_payment(
980 prefix: &str,
981 gateway_name: &str,
982 client: &ClientHandleArc,
983 operation_id: fedimint_core::core::OperationId,
984 event_sender: &mpsc::UnboundedSender<MetricEvent>,
985 pay_invoice_time: std::time::SystemTime,
986) -> anyhow::Result<()> {
987 let elapsed = pay_invoice_time.elapsed()?;
988 info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}");
989 event_sender.send(MetricEvent {
990 name: format!("gateway_{gateway_name}_payment_received_started"),
991 duration: elapsed,
992 })?;
993 let lightning_module = client.get_first_module::<LightningClientModule>()?;
994 let mut updates = lightning_module
995 .subscribe_ln_receive(operation_id)
996 .await?
997 .into_stream();
998 while let Some(update) = updates.next().await {
999 debug!(%prefix, ?update, "Invoice payment update");
1000 match update {
1001 LnReceiveState::Claimed => {
1002 let elapsed: Duration = pay_invoice_time.elapsed()?;
1003 info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}");
1004 event_sender.send(MetricEvent {
1005 name: "gateway_payment_received_success".into(),
1006 duration: elapsed,
1007 })?;
1008 event_sender.send(MetricEvent {
1009 name: format!("gateway_{gateway_name}_payment_received_success"),
1010 duration: elapsed,
1011 })?;
1012 break;
1013 }
1014 LnReceiveState::Canceled { reason } => {
1015 let elapsed: Duration = pay_invoice_time.elapsed()?;
1016 info!(
1017 "{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"
1018 );
1019 event_sender.send(MetricEvent {
1020 name: "gateway_payment_received_canceled".into(),
1021 duration: elapsed,
1022 })?;
1023 break;
1024 }
1025 _ => {}
1026 }
1027 }
1028 Ok(())
1029}
1030
1031async fn client_create_invoice(
1032 client: &ClientHandleArc,
1033 invoice_amount: Amount,
1034 event_sender: &mpsc::UnboundedSender<MetricEvent>,
1035 ln_gateway: Option<LightningGateway>,
1036) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> {
1037 let create_invoice_time = fedimint_core::time::now();
1038 let lightning_module = client.get_first_module::<LightningClientModule>()?;
1039 let desc = Description::new("test".to_string())?;
1040 let (operation_id, invoice, _) = lightning_module
1041 .create_bolt11_invoice(
1042 invoice_amount,
1043 Bolt11InvoiceDescription::Direct(&desc),
1044 None,
1045 (),
1046 ln_gateway,
1047 )
1048 .await?;
1049 let elapsed = create_invoice_time.elapsed()?;
1050 info!("Created invoice using gateway in {elapsed:?}");
1051 event_sender.send(MetricEvent {
1052 name: GATEWAY_CREATE_INVOICE.into(),
1053 duration: elapsed,
1054 })?;
1055 Ok((operation_id, invoice))
1056}
1057
1058fn test_download_config(
1059 invite_code: &InviteCode,
1060 users: u16,
1061 event_sender: &mpsc::UnboundedSender<MetricEvent>,
1062) -> Vec<BoxFuture<'static, anyhow::Result<()>>> {
1063 (0..users)
1064 .map(|_| {
1065 let invite_code = invite_code.clone();
1066 let event_sender = event_sender.clone();
1067 let f: BoxFuture<_> = Box::pin(async move {
1068 let m = fedimint_core::time::now();
1069 let _ = fedimint_api_client::api::net::Connector::default()
1070 .download_from_invite_code(&invite_code)
1071 .await?;
1072 event_sender.send(MetricEvent {
1073 name: "download_client_config".into(),
1074 duration: m.elapsed()?,
1075 })?;
1076 Ok(())
1077 });
1078 f
1079 })
1080 .collect()
1081}
1082
1083async fn test_connect_raw_client(
1084 invite_code: InviteCode,
1085 users: u16,
1086 duration: Duration,
1087 timeout: Duration,
1088 limit_endpoints: Option<usize>,
1089 event_sender: mpsc::UnboundedSender<MetricEvent>,
1090) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
1091 use jsonrpsee_core::client::ClientT;
1092 use jsonrpsee_ws_client::WsClientBuilder;
1093
1094 let mut cfg = fedimint_api_client::api::net::Connector::default()
1095 .download_from_invite_code(&invite_code)
1096 .await?;
1097
1098 if let Some(limit_endpoints) = limit_endpoints {
1099 cfg.global.api_endpoints = cfg
1100 .global
1101 .api_endpoints
1102 .into_iter()
1103 .take(limit_endpoints)
1104 .collect();
1105 info!("Limiting endpoints to {:?}", cfg.global.api_endpoints);
1106 }
1107
1108 info!("Connecting to {users} clients");
1109 let clients = (0..users)
1110 .flat_map(|_| {
1111 let clients = cfg.global.api_endpoints.values().map(|url| async {
1112 let ws_client = WsClientBuilder::default()
1113 .request_timeout(timeout)
1114 .connection_timeout(timeout)
1115 .build(url_to_string_with_default_port(&url.url))
1116 .await?;
1117 Ok::<_, anyhow::Error>(ws_client)
1118 });
1119 clients
1120 })
1121 .collect::<Vec<_>>();
1122 let clients = futures::future::try_join_all(clients).await?;
1123 info!("Keeping {users} clients connected for {duration:?}");
1124 Ok(clients
1125 .into_iter()
1126 .map(|client| {
1127 let event_sender = event_sender.clone();
1128 let f: BoxFuture<_> = Box::pin(async move {
1129 let initial_time = fedimint_core::time::now();
1130 while initial_time.elapsed()? < duration {
1131 let m = fedimint_core::time::now();
1132 let _epoch: u64 = client
1133 .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()])
1134 .await?;
1135 event_sender.send(MetricEvent {
1136 name: SESSION_COUNT_ENDPOINT.into(),
1137 duration: m.elapsed()?,
1138 })?;
1139 fedimint_core::task::sleep(Duration::from_secs(1)).await;
1140 }
1141 Ok(())
1142 });
1143 f
1144 })
1145 .collect())
1146}
1147
1148fn url_to_string_with_default_port(url: &SafeUrl) -> String {
1149 format!(
1150 "{}://{}:{}{}",
1151 url.scheme(),
1152 url.host().expect("Asserted on construction"),
1153 url.port_or_known_default()
1154 .expect("Asserted on construction"),
1155 url.path()
1156 )
1157}
1158
1159async fn handle_metrics_summary(
1160 opts: Opts,
1161 mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>,
1162) -> anyhow::Result<()> {
1163 let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs();
1164 let mut metrics_json_output_files = vec![];
1165 let mut previous_metrics = vec![];
1166 let mut comparison_output = None;
1167 if let Some(archive_dir) = opts.archive_dir {
1168 let mut archive_metrics = archive_dir.join("metrics");
1169 archive_metrics.push(opts.users.to_string());
1170 tokio::fs::create_dir_all(&archive_metrics).await?;
1171 let mut archive_comparisons = archive_dir.join("comparisons");
1172 archive_comparisons.push(opts.users.to_string());
1173 tokio::fs::create_dir_all(&archive_comparisons).await?;
1174
1175 let latest_metrics_file = std::fs::read_dir(&archive_metrics)?
1176 .map(|entry| {
1177 let entry = entry.unwrap();
1178 let metadata = entry.metadata().unwrap();
1179 let created = metadata
1180 .created()
1181 .unwrap_or_else(|_| metadata.modified().unwrap());
1182 (entry, created)
1183 })
1184 .max_by_key(|(_entry, created)| created.to_owned())
1185 .map(|(entry, _)| entry.path());
1186 if let Some(latest_metrics_file) = latest_metrics_file {
1187 let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file)
1188 .await
1189 .with_context(|| format!("Failed to open {latest_metrics_file:?}"))?;
1190 let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines();
1191 while let Some(line) = lines.next_line().await? {
1192 match serde_json::from_str::<EventMetricSummary>(&line) {
1193 Ok(metric) => {
1194 previous_metrics.push(metric);
1195 }
1196 Err(e) => {
1197 warn!("Failed to parse previous metric: {e:?}");
1198 }
1199 }
1200 }
1201 }
1202 let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",));
1203 let new_metric_output = BufWriter::new(
1204 OpenOptions::new()
1205 .write(true)
1206 .create(true)
1207 .truncate(true)
1208 .open(new_metric_output)
1209 .await?,
1210 );
1211 metrics_json_output_files.push(new_metric_output);
1212 if !previous_metrics.is_empty() {
1213 let new_comparison_output =
1214 archive_comparisons.join(format!("{timestamp_seconds}.json",));
1215 comparison_output = Some(BufWriter::new(
1216 OpenOptions::new()
1217 .write(true)
1218 .create(true)
1219 .truncate(true)
1220 .open(new_comparison_output)
1221 .await?,
1222 ));
1223 }
1224 }
1225 if let Some(metrics_json_output) = opts.metrics_json_output {
1226 metrics_json_output_files.push(BufWriter::new(
1227 tokio::fs::OpenOptions::new()
1228 .write(true)
1229 .create(true)
1230 .truncate(true)
1231 .open(metrics_json_output)
1232 .await?,
1233 ));
1234 }
1235 let mut results = BTreeMap::new();
1236 while let Some(event) = event_receiver.recv().await {
1237 let entry = results.entry(event.name).or_insert_with(Vec::new);
1238 entry.push(event.duration);
1239 }
1240 let mut previous_metrics = previous_metrics
1241 .into_iter()
1242 .map(|metric| (metric.name.clone(), metric))
1243 .collect::<HashMap<_, _>>();
1244 for (k, mut v) in results {
1245 v.sort();
1246 let n = v.len();
1247 let max = v.iter().last().unwrap();
1248 let min = v.first().unwrap();
1249 let median = v[n / 2];
1250 let sum: Duration = v.iter().sum();
1251 let avg = sum / n as u32;
1252 let metric_summary = EventMetricSummary {
1253 name: k.clone(),
1254 users: u64::from(opts.users),
1255 n: n as u64,
1256 avg_ms: avg.as_millis(),
1257 median_ms: median.as_millis(),
1258 max_ms: max.as_millis(),
1259 min_ms: min.as_millis(),
1260 timestamp_seconds,
1261 };
1262 let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) {
1263 if previous_metric.n == metric_summary.n {
1264 fn calculate_gain(current: u128, previous: u128) -> f64 {
1265 current as f64 / previous as f64
1266 }
1267 let comparison = EventMetricComparison {
1268 avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms),
1269 median_ms_gain: calculate_gain(
1270 metric_summary.median_ms,
1271 previous_metric.median_ms,
1272 ),
1273 max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms),
1274 min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms),
1275 current: metric_summary.clone(),
1276 previous: previous_metric,
1277 };
1278 if let Some(comparison_output) = &mut comparison_output {
1279 let comparison_json =
1280 serde_json::to_string(&comparison).expect("to be serializable");
1281 comparison_output
1282 .write_all(format!("{comparison_json}\n").as_bytes())
1283 .await
1284 .expect("to write on file");
1285 }
1286 Some(comparison)
1287 } else {
1288 info!(
1289 "Skipping comparison for {k} because previous metric has different n ({} vs {})",
1290 previous_metric.n, metric_summary.n
1291 );
1292 None
1293 }
1294 } else {
1295 None
1296 };
1297 if let Some(comparison) = comparison {
1298 println!(
1299 "{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"
1300 );
1301 } else {
1302 println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}");
1303 }
1304 let metric_summary_json =
1305 serde_json::to_string(&metric_summary).expect("to be serializable");
1306 for metrics_json_output_file in &mut metrics_json_output_files {
1307 metrics_json_output_file
1308 .write_all(format!("{metric_summary_json}\n").as_bytes())
1309 .await
1310 .expect("to write on file");
1311 }
1312 }
1313 for mut output in metrics_json_output_files {
1314 output.flush().await?;
1315 }
1316 if let Some(mut output) = comparison_output {
1317 output.flush().await?;
1318 }
1319 Ok(())
1320}
1321
1322async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> {
1323 let gateway_json = match generate_invoice_with {
1324 LnInvoiceGeneration::LdkLightningCli => {
1325 cmd!(GatewayLndCli, "info").out_json().await
1327 }
1328 }?;
1329 let gateway_id = gateway_json["gateway_id"]
1330 .as_str()
1331 .context("Missing gateway_id field")?;
1332
1333 Ok(gateway_id.into())
1334}