1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_precision_loss)]
4#![allow(clippy::missing_errors_doc)]
5#![allow(clippy::ref_option)]
6#![allow(clippy::too_many_lines)]
7#![allow(clippy::large_futures)]
8
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::time::Duration;
13use std::vec;
14
15use anyhow::{Context, anyhow, bail};
16use clap::{Args, Parser, Subcommand, ValueEnum};
17use common::{
18 gateway_pay_invoice, get_note_summary, ldk_create_invoice, ldk_pay_invoice,
19 ldk_wait_invoice_payment, parse_gateway_id, reissue_notes,
20};
21use devimint::cmd;
22use devimint::util::GatewayLndCli;
23use fedimint_client::ClientHandleArc;
24use fedimint_core::Amount;
25use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
26use fedimint_core::invite_code::InviteCode;
27use fedimint_core::module::ApiRequestErased;
28use fedimint_core::runtime::spawn;
29use fedimint_core::util::{BoxFuture, SafeUrl};
30use fedimint_ln_client::{LightningClientModule, LnReceiveState};
31use fedimint_ln_common::LightningGateway;
32use fedimint_mint_client::OOBNotes;
33use futures::StreamExt;
34use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
35use serde::{Deserialize, Serialize};
36use tokio::fs::OpenOptions;
37use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
38use tokio::sync::mpsc;
39use tracing::{debug, info, warn};
40
41use crate::common::{
42 build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli,
43};
44pub mod common;
45
46#[derive(Parser, Clone)]
47#[command(version)]
48struct Opts {
49 #[arg(
50 long,
51 default_value = "10",
52 help = "Number of users. Each user will work in parallel"
53 )]
54 users: u16,
55
56 #[arg(long, help = "Output with the metrics results in JSON format")]
57 metrics_json_output: Option<PathBuf>,
58
59 #[arg(
60 long,
61 help = "If given, will be used to store and retrieve past metrics for comparison purposes"
62 )]
63 archive_dir: Option<PathBuf>,
64
65 #[clap(subcommand)]
66 command: Command,
67}
68
69#[derive(Debug, Clone, Copy, ValueEnum)]
70enum LnInvoiceGeneration {
71 LdkLightningCli,
72}
73
74#[derive(Subcommand, Clone)]
75enum Command {
76 #[command(about = "Keep many websocket connections to a federation for a duration of time")]
77 TestConnect {
78 #[arg(long, help = "Federation invite code")]
79 invite_code: String,
80 #[arg(
81 long,
82 default_value = "60",
83 help = "How much time to keep the connections open, in seconds"
84 )]
85 duration_secs: u64,
86 #[arg(
87 long,
88 default_value = "120",
89 help = "Timeout for connection attempt and for each request, in secnods"
90 )]
91 timeout_secs: u64,
92 #[arg(
93 long,
94 help = "If given, will limit the number of endpoints (guardians) to connect to"
95 )]
96 limit_endpoints: Option<usize>,
97 },
98 #[command(about = "Try to download the client config many times.")]
99 TestDownload {
100 #[arg(long, help = "Federation invite code")]
101 invite_code: String,
102 },
103 #[command(
104 about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway"
105 )]
106 LoadTest(LoadTestArgs),
107 #[command()]
112 LnCircularLoadTest(LnCircularLoadTestArgs),
113}
114
115#[derive(Args, Clone)]
116struct LoadTestArgs {
117 #[arg(
118 long,
119 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
120 )]
121 invite_code: Option<InviteCode>,
122
123 #[arg(
124 long,
125 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
126 )]
127 initial_notes: Option<OOBNotes>,
128
129 #[arg(
130 long,
131 help = "Gateway Id. If none, retrieve one according to --generate-invoice-with"
132 )]
133 gateway_id: Option<String>,
134
135 #[arg(
136 long,
137 help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)"
138 )]
139 generate_invoice_with: Option<LnInvoiceGeneration>,
140
141 #[arg(
142 long,
143 default_value = "1",
144 help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided"
145 )]
146 invoices_per_user: u16,
147
148 #[arg(
149 long,
150 default_value = "0",
151 help = "How many seconds to sleep between LN payments"
152 )]
153 ln_payment_sleep_secs: u64,
154
155 #[arg(
156 long,
157 help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid"
158 )]
159 invoices_file: Option<PathBuf>,
160
161 #[arg(
162 long,
163 help = "How many notes to distribute to each user",
164 default_value = "2"
165 )]
166 notes_per_user: u16,
167
168 #[arg(
169 long,
170 help = "Note denomination to use for the test",
171 default_value = "1024"
172 )]
173 note_denomination: Amount,
174
175 #[arg(
176 long,
177 help = "Invoice amount when generating one",
178 default_value = "1000"
179 )]
180 invoice_amount: Amount,
181}
182
183#[derive(Args, Clone)]
184struct LnCircularLoadTestArgs {
185 #[arg(
186 long,
187 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
188 )]
189 invite_code: Option<InviteCode>,
190
191 #[arg(
192 long,
193 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
194 )]
195 initial_notes: Option<OOBNotes>,
196
197 #[arg(
198 long,
199 default_value = "60",
200 help = "For how many seconds to run the test"
201 )]
202 test_duration_secs: u64,
203
204 #[arg(
205 long,
206 default_value = "0",
207 help = "How many seconds to sleep between LN payments"
208 )]
209 ln_payment_sleep_secs: u64,
210
211 #[arg(
212 long,
213 help = "How many notes to distribute to each user",
214 default_value = "1"
215 )]
216 notes_per_user: u16,
217
218 #[arg(
219 long,
220 help = "Note denomination to use for the test",
221 default_value = "1024"
222 )]
223 note_denomination: Amount,
224
225 #[arg(
226 long,
227 help = "Invoice amount when generating one",
228 default_value = "1000"
229 )]
230 invoice_amount: Amount,
231
232 #[arg(long)]
233 strategy: LnCircularStrategy,
234}
235
236#[derive(Debug, Clone, Copy, ValueEnum)]
237enum LnCircularStrategy {
238 SelfPayment,
240 TwoGateways,
243 PartnerPingPong,
245}
246
247#[derive(Debug, Clone)]
248pub struct MetricEvent {
249 name: String,
250 duration: Duration,
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
254struct EventMetricSummary {
255 name: String,
256 users: u64,
257 n: u64,
258 avg_ms: u128,
259 median_ms: u128,
260 max_ms: u128,
261 min_ms: u128,
262 timestamp_seconds: u64,
263}
264
265#[derive(Debug, Serialize, Deserialize)]
266struct EventMetricComparison {
267 avg_ms_gain: f64,
268 median_ms_gain: f64,
269 max_ms_gain: f64,
270 min_ms_gain: f64,
271 current: EventMetricSummary,
272 previous: EventMetricSummary,
273}
274
275impl std::fmt::Display for EventMetricComparison {
276 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277 fn to_percent(gain: f64) -> String {
278 if gain >= 1.0 {
279 format!("+{:.2}%", (gain - 1.0) * 100.0)
280 } else {
281 format!("-{:.2}%", (1.0 - gain) * 100.0)
282 }
283 }
284 f.write_str(&format!(
285 "avg: {}, median: {}, max: {}, min: {}",
286 to_percent(self.avg_ms_gain),
287 to_percent(self.median_ms_gain),
288 to_percent(self.max_ms_gain),
289 to_percent(self.min_ms_gain),
290 ))
291 }
292}
293
294#[tokio::main]
295async fn main() -> anyhow::Result<()> {
296 fedimint_logging::TracingSetup::default().init()?;
297 let opts = Opts::parse();
298 let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
299 let summary_handle = spawn("handle metrics summary", {
300 let opts = opts.clone();
301 async { handle_metrics_summary(opts, event_receiver).await }
302 });
303 let futures = match opts.command.clone() {
304 Command::TestConnect {
305 invite_code,
306 duration_secs,
307 timeout_secs,
308 limit_endpoints,
309 } => {
310 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
311 test_connect_raw_client(
312 invite_code,
313 opts.users,
314 Duration::from_secs(duration_secs),
315 Duration::from_secs(timeout_secs),
316 limit_endpoints,
317 event_sender.clone(),
318 )
319 .await?
320 }
321 Command::TestDownload { invite_code } => {
322 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
323 test_download_config(&invite_code, opts.users, &event_sender.clone())
324 }
325 Command::LoadTest(args) => {
326 let invite_code = invite_code_or_fallback(args.invite_code).await;
327
328 let gateway_id = if let Some(gateway_id) = args.gateway_id {
329 Some(gateway_id)
330 } else if let Some(generate_invoice_with) = args.generate_invoice_with {
331 Some(get_gateway_id(generate_invoice_with).await?)
332 } else {
333 None
334 };
335 let invoices = if let Some(invoices_file) = args.invoices_file {
336 let display_path = invoices_file.display();
337 let invoices_file = tokio::fs::File::open(&invoices_file)
338 .await
339 .with_context(|| format!("Failed to open {display_path}"))?;
340 let mut lines = tokio::io::BufReader::new(invoices_file).lines();
341 let mut invoices = vec![];
342 while let Some(line) = lines.next_line().await? {
343 let invoice = Bolt11Invoice::from_str(&line)?;
344 invoices.push(invoice);
345 }
346 invoices
347 } else {
348 vec![]
349 };
350 if args.generate_invoice_with.is_none() && invoices.is_empty() {
351 info!(
352 "No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run"
353 );
354 }
355 run_load_test(
356 opts.archive_dir,
357 opts.users,
358 invite_code,
359 args.initial_notes,
360 args.generate_invoice_with,
361 args.invoices_per_user,
362 Duration::from_secs(args.ln_payment_sleep_secs),
363 invoices,
364 gateway_id,
365 args.notes_per_user,
366 args.note_denomination,
367 args.invoice_amount,
368 event_sender.clone(),
369 )
370 .await?
371 }
372 Command::LnCircularLoadTest(args) => {
373 let invite_code = invite_code_or_fallback(args.invite_code).await;
374 run_ln_circular_load_test(
375 opts.archive_dir,
376 opts.users,
377 invite_code,
378 args.initial_notes,
379 Duration::from_secs(args.test_duration_secs),
380 Duration::from_secs(args.ln_payment_sleep_secs),
381 args.notes_per_user,
382 args.note_denomination,
383 args.invoice_amount,
384 args.strategy,
385 event_sender.clone(),
386 )
387 .await?
388 }
389 };
390
391 let result = futures::future::join_all(futures).await;
392 drop(event_sender);
393 summary_handle.await??;
394 let len_failures = result.iter().filter(|r| r.is_err()).count();
395 eprintln!("{} results, {len_failures} failures", result.len());
396 for r in result {
397 if let Err(e) = r {
398 warn!("Task failed: {:?}", e);
399 }
400 }
401 if len_failures > 0 {
402 bail!("Finished with failures");
403 }
404 info!("Finished successfully");
405 Ok(())
406}
407
408async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> {
409 if let Some(invite_code) = invite_code {
410 Some(invite_code)
411 } else {
412 match get_invite_code_cli(0.into()).await {
414 Ok(invite_code) => Some(invite_code),
415 Err(e) => {
416 info!(
417 "No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."
418 );
419 None
420 }
421 }
422 }
423}
424
425#[allow(clippy::too_many_arguments)]
426async fn run_load_test(
427 archive_dir: Option<PathBuf>,
428 users: u16,
429 invite_code: Option<InviteCode>,
430 initial_notes: Option<OOBNotes>,
431 generate_invoice_with: Option<LnInvoiceGeneration>,
432 generated_invoices_per_user: u16,
433 ln_payment_sleep: Duration,
434 invoices_from_file: Vec<Bolt11Invoice>,
435 gateway_id: Option<String>,
436 notes_per_user: u16,
437 note_denomination: Amount,
438 invoice_amount: Amount,
439 event_sender: mpsc::UnboundedSender<MetricEvent>,
440) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
441 let db_path = get_db_path(&archive_dir);
442 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
443 let minimum_notes = notes_per_user * users;
444 let minimum_amount_required = note_denomination * u64::from(minimum_notes);
445
446 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
447 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
448 print_coordinator_notes(&coordinator).await?;
449 info!(
450 "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
451 );
452 remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
453 print_coordinator_notes(&coordinator).await?;
454
455 let users_clients = get_users_clients(users, db_path, invite_code).await?;
456
457 let mut users_notes =
458 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
459 let mut users_invoices = HashMap::new();
460 let mut user = 0;
461 for invoice in invoices_from_file {
463 users_invoices
464 .entry(user)
465 .or_insert_with(Vec::new)
466 .push(invoice);
467 user = (user + 1) % users;
468 }
469
470 info!("Starting user tasks");
471 let futures = users_clients
472 .into_iter()
473 .enumerate()
474 .map(|(u, client)| {
475 let u = u as u16;
476 let oob_notes = users_notes.remove(&u).unwrap();
477 let invoices = users_invoices.remove(&u).unwrap_or_default();
478 let event_sender = event_sender.clone();
479 let f: BoxFuture<_> = Box::pin(do_load_test_user_task(
480 format!("User {u}:"),
481 client,
482 oob_notes,
483 generated_invoices_per_user,
484 ln_payment_sleep,
485 invoice_amount,
486 invoices,
487 generate_invoice_with,
488 event_sender,
489 gateway_id.clone(),
490 ));
491 f
492 })
493 .collect::<Vec<_>>();
494
495 Ok(futures)
496}
497
498async fn get_notes_for_users(
499 users: u16,
500 notes_per_user: u16,
501 coordinator: ClientHandleArc,
502 note_denomination: Amount,
503) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> {
504 let mut users_notes = HashMap::new();
505 for u in 0..users {
506 users_notes.insert(u, Vec::with_capacity(notes_per_user.into()));
507 for _ in 0..notes_per_user {
508 let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?;
509 let user_amount = oob_notes.total_amount();
510 info!("Giving {user_amount} to user {u}");
511 users_notes.get_mut(&u).unwrap().push(oob_notes);
512 }
513 }
514 Ok(users_notes)
515}
516
517async fn get_users_clients(
518 n: u16,
519 db_path: Option<PathBuf>,
520 invite_code: Option<InviteCode>,
521) -> anyhow::Result<Vec<ClientHandleArc>> {
522 let mut users_clients = Vec::with_capacity(n.into());
523 for u in 0..n {
524 let (client, _) = get_user_client(u, &db_path, &invite_code).await?;
525 users_clients.push(client);
526 }
527 Ok(users_clients)
528}
529
530async fn get_user_client(
531 user_index: u16,
532 db_path: &Option<PathBuf>,
533 invite_code: &Option<InviteCode>,
534) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
535 let user_db = db_path
536 .as_ref()
537 .map(|db_path| db_path.join(format!("user_{user_index}.db")));
538 let user_invite_code = if user_db.as_ref().is_some_and(|db| db.exists()) {
539 None
540 } else {
541 invite_code.clone()
542 };
543 let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?;
544 if let Ok(ln_client) = client.get_first_module::<LightningClientModule>() {
546 let _ = ln_client.update_gateway_cache().await;
547 }
548 Ok((client, invite_code))
549}
550
551async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> {
552 info!("Note summary:");
553 let summary = get_note_summary(coordinator).await?;
554 for (k, v) in summary.iter() {
555 info!("{k}: {v}");
556 }
557 Ok(())
558}
559
560async fn get_required_notes(
561 coordinator: &ClientHandleArc,
562 minimum_amount_required: Amount,
563 event_sender: &mpsc::UnboundedSender<MetricEvent>,
564) -> anyhow::Result<()> {
565 let current_balance = coordinator.get_balance_for_btc().await?;
566
567 if current_balance < minimum_amount_required {
568 let diff = minimum_amount_required.saturating_sub(current_balance);
569 info!(
570 "Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"
571 );
572 match try_get_notes_cli(&diff, 5).await {
573 Ok(notes) => {
574 info!("Got {} more notes, reissuing them", notes.total_amount());
575 reissue_notes(coordinator, notes, event_sender).await?;
576 }
577 Err(e) => {
578 info!("Unable to get more notes: '{e}', will try to proceed without them");
579 }
580 }
581 } else {
582 info!(
583 "Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"
584 );
585 }
586 Ok(())
587}
588
589async fn reissue_initial_notes(
590 initial_notes: Option<OOBNotes>,
591 coordinator: &ClientHandleArc,
592 event_sender: &mpsc::UnboundedSender<MetricEvent>,
593) -> anyhow::Result<()> {
594 if let Some(notes) = initial_notes {
595 let amount = notes.total_amount();
596 info!("Reissuing initial notes, got {amount}");
597 reissue_notes(coordinator, notes, event_sender).await?;
598 }
599 Ok(())
600}
601
602async fn get_coordinator_client(
603 db_path: &Option<PathBuf>,
604 invite_code: &Option<InviteCode>,
605) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
606 let (client, invite_code) = if let Some(db_path) = db_path {
607 let coordinator_db = db_path.join("coordinator.db");
608 if coordinator_db.exists() {
609 build_client(invite_code.clone(), Some(&coordinator_db)).await?
610 } else {
611 tokio::fs::create_dir_all(db_path).await?;
612 build_client(
613 Some(invite_code.clone().context(
614 "Running on this archive dir for the first time, an invite code is required",
615 )?),
616 Some(&coordinator_db),
617 )
618 .await?
619 }
620 } else {
621 build_client(
622 Some(
623 invite_code
624 .clone()
625 .context("No archive dir given, an invite code is strictly required")?,
626 ),
627 None,
628 )
629 .await?
630 };
631 Ok((client, invite_code))
632}
633
634fn get_db_path(archive_dir: &Option<PathBuf>) -> Option<PathBuf> {
635 archive_dir.as_ref().map(|p| p.join("db"))
636}
637
638async fn get_lightning_gateway(
639 client: &ClientHandleArc,
640 gateway_id: Option<String>,
641) -> Option<LightningGateway> {
642 let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id");
643 let ln_module = client
644 .get_first_module::<LightningClientModule>()
645 .expect("Must have ln client module");
646 ln_module.select_gateway(&gateway_id).await
647}
648
649#[allow(clippy::too_many_arguments)]
650async fn do_load_test_user_task(
651 prefix: String,
652 client: ClientHandleArc,
653 oob_notes: Vec<OOBNotes>,
654 generated_invoices_per_user: u16,
655 ln_payment_sleep: Duration,
656 invoice_amount: Amount,
657 additional_invoices: Vec<Bolt11Invoice>,
658 generate_invoice_with: Option<LnInvoiceGeneration>,
659 event_sender: mpsc::UnboundedSender<MetricEvent>,
660 gateway_id: Option<String>,
661) -> anyhow::Result<()> {
662 let ln_gateway = get_lightning_gateway(&client, gateway_id).await;
663 for oob_note in oob_notes {
664 let amount = oob_note.total_amount();
665 reissue_notes(&client, oob_note, &event_sender)
666 .await
667 .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
668 }
669 let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable();
670 while let Some(_) = generated_invoices_per_user_iterator.next() {
671 let total_amount = get_note_summary(&client).await?.total_amount();
672 if invoice_amount > total_amount {
673 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
674 } else {
675 match generate_invoice_with {
676 Some(LnInvoiceGeneration::LdkLightningCli) => {
677 let invoice = ldk_create_invoice(invoice_amount).await?;
678 gateway_pay_invoice(
679 &prefix,
680 "LND",
681 &client,
682 invoice.clone(),
683 &event_sender,
684 ln_gateway.clone(),
685 )
686 .await?;
687 ldk_wait_invoice_payment(&invoice).await?;
688 }
689 None if additional_invoices.is_empty() => {
690 debug!(
691 "No method given to generate an invoice and no invoices on file, will not test the gateway"
692 );
693 break;
694 }
695 None => {
696 break;
697 }
698 }
699 if generated_invoices_per_user_iterator.peek().is_some() {
700 fedimint_core::task::sleep(ln_payment_sleep).await;
702 }
703 }
704 }
705 let mut additional_invoices = additional_invoices.into_iter().peekable();
706 while let Some(invoice) = additional_invoices.next() {
707 let total_amount = get_note_summary(&client).await?.total_amount();
708 let invoice_amount =
709 Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default());
710 if invoice_amount > total_amount {
711 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
712 } else if invoice_amount == Amount::ZERO {
713 warn!("Can't pay invoice {invoice}, amount is zero");
714 } else {
715 gateway_pay_invoice(
716 &prefix,
717 "unknown",
718 &client,
719 invoice,
720 &event_sender,
721 ln_gateway.clone(),
722 )
723 .await?;
724 if additional_invoices.peek().is_some() {
725 fedimint_core::task::sleep(ln_payment_sleep).await;
727 }
728 }
729 }
730 Ok(())
731}
732
733#[allow(clippy::too_many_arguments)]
734async fn run_ln_circular_load_test(
735 archive_dir: Option<PathBuf>,
736 users: u16,
737 invite_code: Option<InviteCode>,
738 initial_notes: Option<OOBNotes>,
739 test_duration: Duration,
740 ln_payment_sleep: Duration,
741 notes_per_user: u16,
742 note_denomination: Amount,
743 invoice_amount: Amount,
744 strategy: LnCircularStrategy,
745 event_sender: mpsc::UnboundedSender<MetricEvent>,
746) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
747 let db_path = get_db_path(&archive_dir);
748 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
749 let minimum_notes = notes_per_user * users;
750 let minimum_amount_required = note_denomination * u64::from(minimum_notes);
751
752 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
753 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
754
755 info!(
756 "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
757 );
758 remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
759
760 print_coordinator_notes(&coordinator).await?;
761
762 let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?;
763
764 let mut users_notes =
765 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
766
767 info!("Starting user tasks");
768 let futures = users_clients
769 .into_iter()
770 .enumerate()
771 .map(|(u, client)| {
772 let u = u as u16;
773 let oob_notes = users_notes.remove(&u).unwrap();
774 let event_sender = event_sender.clone();
775 let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task(
776 format!("User {u}:"),
777 client,
778 invite_code.clone(),
779 oob_notes,
780 test_duration,
781 ln_payment_sleep,
782 invoice_amount,
783 strategy,
784 event_sender,
785 ));
786 f
787 })
788 .collect::<Vec<_>>();
789
790 Ok(futures)
791}
792
793#[allow(clippy::too_many_arguments)]
794async fn do_ln_circular_test_user_task(
795 prefix: String,
796 client: ClientHandleArc,
797 invite_code: Option<InviteCode>,
798 oob_notes: Vec<OOBNotes>,
799 test_duration: Duration,
800 ln_payment_sleep: Duration,
801 invoice_amount: Amount,
802 strategy: LnCircularStrategy,
803 event_sender: mpsc::UnboundedSender<MetricEvent>,
804) -> anyhow::Result<()> {
805 for oob_note in oob_notes {
806 let amount = oob_note.total_amount();
807 reissue_notes(&client, oob_note, &event_sender)
808 .await
809 .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
810 }
811 let initial_time = fedimint_core::time::now();
812 let still_ontime = || async {
813 fedimint_core::time::now()
814 .duration_since(initial_time)
815 .expect("time to work")
816 <= test_duration
817 };
818 let sleep_a_bit = || async {
819 if still_ontime().await {
820 fedimint_core::task::sleep(ln_payment_sleep).await;
821 }
822 };
823 match strategy {
824 LnCircularStrategy::TwoGateways => {
825 let invoice_generation = LnInvoiceGeneration::LdkLightningCli;
826 while still_ontime().await {
827 let gateway_id = get_gateway_id(invoice_generation).await?;
828 let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await;
829 run_two_gateways_strategy(
830 &prefix,
831 &invoice_generation,
832 &invoice_amount,
833 &event_sender,
834 &client,
835 ln_gateway,
836 )
837 .await?;
838 sleep_a_bit().await;
839 }
840 }
841 LnCircularStrategy::SelfPayment => {
842 while still_ontime().await {
843 do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?;
844 sleep_a_bit().await;
845 }
846 }
847 LnCircularStrategy::PartnerPingPong => {
848 let (partner, _) = build_client(invite_code, None).await?;
849 while still_ontime().await {
850 do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender)
851 .await?;
852 sleep_a_bit().await;
853 }
854 }
855 }
856 Ok(())
857}
858
859const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice";
860
861async fn run_two_gateways_strategy(
862 prefix: &str,
863 invoice_generation: &LnInvoiceGeneration,
864 invoice_amount: &Amount,
865 event_sender: &mpsc::UnboundedSender<MetricEvent>,
866 client: &ClientHandleArc,
867 ln_gateway: Option<LightningGateway>,
868) -> Result<(), anyhow::Error> {
869 let create_invoice_time = fedimint_core::time::now();
870 match *invoice_generation {
871 LnInvoiceGeneration::LdkLightningCli => {
872 let invoice = ldk_create_invoice(*invoice_amount).await?;
873 let elapsed = create_invoice_time.elapsed()?;
874 info!("Created invoice using LDK in {elapsed:?}");
875 event_sender.send(MetricEvent {
876 name: GATEWAY_CREATE_INVOICE.into(),
877 duration: elapsed,
878 })?;
879 gateway_pay_invoice(
880 prefix,
881 "LND",
882 client,
883 invoice.clone(),
884 event_sender,
885 ln_gateway.clone(),
886 )
887 .await?;
888 ldk_wait_invoice_payment(&invoice).await?;
889 let (operation_id, invoice) =
890 client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?;
891 let pay_invoice_time = fedimint_core::time::now();
892 ldk_pay_invoice(invoice).await?;
893 wait_invoice_payment(
894 prefix,
895 "LND",
896 client,
897 operation_id,
898 event_sender,
899 pay_invoice_time,
900 )
901 .await?;
902 }
903 }
904 Ok(())
905}
906
907async fn do_self_payment(
908 prefix: &str,
909 client: &ClientHandleArc,
910 invoice_amount: Amount,
911 event_sender: &mpsc::UnboundedSender<MetricEvent>,
912) -> anyhow::Result<()> {
913 let (operation_id, invoice) =
914 client_create_invoice(client, invoice_amount, event_sender, None).await?;
915 let pay_invoice_time = fedimint_core::time::now();
916 let lightning_module = client.get_first_module::<LightningClientModule>()?;
917 lightning_module
919 .pay_bolt11_invoice(None, invoice, ())
920 .await?;
921 wait_invoice_payment(
922 prefix,
923 "gateway",
924 client,
925 operation_id,
926 event_sender,
927 pay_invoice_time,
928 )
929 .await?;
930 Ok(())
931}
932
933async fn do_partner_ping_pong(
934 prefix: &str,
935 client: &ClientHandleArc,
936 partner: &ClientHandleArc,
937 invoice_amount: Amount,
938 event_sender: &mpsc::UnboundedSender<MetricEvent>,
939) -> anyhow::Result<()> {
940 let (operation_id, invoice) =
942 client_create_invoice(partner, invoice_amount, event_sender, None).await?;
943 let pay_invoice_time = fedimint_core::time::now();
944 let lightning_module = client.get_first_module::<LightningClientModule>()?;
945 lightning_module
948 .pay_bolt11_invoice(None, invoice, ())
949 .await?;
950 wait_invoice_payment(
951 prefix,
952 "gateway",
953 partner,
954 operation_id,
955 event_sender,
956 pay_invoice_time,
957 )
958 .await?;
959 let (operation_id, invoice) =
961 client_create_invoice(client, invoice_amount, event_sender, None).await?;
962 let pay_invoice_time = fedimint_core::time::now();
963 let partner_lightning_module = partner.get_first_module::<LightningClientModule>()?;
964 partner_lightning_module
967 .pay_bolt11_invoice(None, invoice, ())
968 .await?;
969 wait_invoice_payment(
970 prefix,
971 "gateway",
972 client,
973 operation_id,
974 event_sender,
975 pay_invoice_time,
976 )
977 .await?;
978 Ok(())
979}
980
981async fn wait_invoice_payment(
982 prefix: &str,
983 gateway_name: &str,
984 client: &ClientHandleArc,
985 operation_id: fedimint_core::core::OperationId,
986 event_sender: &mpsc::UnboundedSender<MetricEvent>,
987 pay_invoice_time: std::time::SystemTime,
988) -> anyhow::Result<()> {
989 let elapsed = pay_invoice_time.elapsed()?;
990 info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}");
991 event_sender.send(MetricEvent {
992 name: format!("gateway_{gateway_name}_payment_received_started"),
993 duration: elapsed,
994 })?;
995 let lightning_module = client.get_first_module::<LightningClientModule>()?;
996 let mut updates = lightning_module
997 .subscribe_ln_receive(operation_id)
998 .await?
999 .into_stream();
1000 while let Some(update) = updates.next().await {
1001 debug!(%prefix, ?update, "Invoice payment update");
1002 match update {
1003 LnReceiveState::Claimed => {
1004 let elapsed: Duration = pay_invoice_time.elapsed()?;
1005 info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}");
1006 event_sender.send(MetricEvent {
1007 name: "gateway_payment_received_success".into(),
1008 duration: elapsed,
1009 })?;
1010 event_sender.send(MetricEvent {
1011 name: format!("gateway_{gateway_name}_payment_received_success"),
1012 duration: elapsed,
1013 })?;
1014 break;
1015 }
1016 LnReceiveState::Canceled { reason } => {
1017 let elapsed: Duration = pay_invoice_time.elapsed()?;
1018 info!(
1019 "{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"
1020 );
1021 event_sender.send(MetricEvent {
1022 name: "gateway_payment_received_canceled".into(),
1023 duration: elapsed,
1024 })?;
1025 break;
1026 }
1027 _ => {}
1028 }
1029 }
1030 Ok(())
1031}
1032
1033async fn client_create_invoice(
1034 client: &ClientHandleArc,
1035 invoice_amount: Amount,
1036 event_sender: &mpsc::UnboundedSender<MetricEvent>,
1037 ln_gateway: Option<LightningGateway>,
1038) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> {
1039 let create_invoice_time = fedimint_core::time::now();
1040 let lightning_module = client.get_first_module::<LightningClientModule>()?;
1041 let desc = Description::new("test".to_string())?;
1042 let (operation_id, invoice, _) = lightning_module
1043 .create_bolt11_invoice(
1044 invoice_amount,
1045 Bolt11InvoiceDescription::Direct(desc),
1046 None,
1047 (),
1048 ln_gateway,
1049 )
1050 .await?;
1051 let elapsed = create_invoice_time.elapsed()?;
1052 info!("Created invoice using gateway in {elapsed:?}");
1053 event_sender.send(MetricEvent {
1054 name: GATEWAY_CREATE_INVOICE.into(),
1055 duration: elapsed,
1056 })?;
1057 Ok((operation_id, invoice))
1058}
1059
1060fn test_download_config(
1061 invite_code: &InviteCode,
1062 users: u16,
1063 event_sender: &mpsc::UnboundedSender<MetricEvent>,
1064) -> Vec<BoxFuture<'static, anyhow::Result<()>>> {
1065 (0..users)
1066 .map(|_| {
1067 let invite_code = invite_code.clone();
1068 let event_sender = event_sender.clone();
1069 let f: BoxFuture<_> = Box::pin(async move {
1070 let m = fedimint_core::time::now();
1071 let _ = fedimint_api_client::api::net::ConnectorType::default()
1072 .download_from_invite_code(&invite_code, false, false)
1073 .await?;
1074 event_sender.send(MetricEvent {
1075 name: "download_client_config".into(),
1076 duration: m.elapsed()?,
1077 })?;
1078 Ok(())
1079 });
1080 f
1081 })
1082 .collect()
1083}
1084
1085async fn test_connect_raw_client(
1086 invite_code: InviteCode,
1087 users: u16,
1088 duration: Duration,
1089 timeout: Duration,
1090 limit_endpoints: Option<usize>,
1091 event_sender: mpsc::UnboundedSender<MetricEvent>,
1092) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
1093 use jsonrpsee_core::client::ClientT;
1094 use jsonrpsee_ws_client::WsClientBuilder;
1095
1096 let (mut cfg, _) = fedimint_api_client::api::net::ConnectorType::default()
1097 .download_from_invite_code(&invite_code, false, false)
1098 .await?;
1099
1100 if let Some(limit_endpoints) = limit_endpoints {
1101 cfg.global.api_endpoints = cfg
1102 .global
1103 .api_endpoints
1104 .into_iter()
1105 .take(limit_endpoints)
1106 .collect();
1107 info!("Limiting endpoints to {:?}", cfg.global.api_endpoints);
1108 }
1109
1110 info!("Connecting to {users} clients");
1111 let clients = (0..users)
1112 .flat_map(|_| {
1113 cfg.global.api_endpoints.values().map(|url| async {
1114 let ws_client = WsClientBuilder::default()
1115 .request_timeout(timeout)
1116 .connection_timeout(timeout)
1117 .build(url_to_string_with_default_port(&url.url))
1118 .await?;
1119 Ok::<_, anyhow::Error>(ws_client)
1120 })
1121 })
1122 .collect::<Vec<_>>();
1123 let clients = futures::future::try_join_all(clients).await?;
1124 info!("Keeping {users} clients connected for {duration:?}");
1125 Ok(clients
1126 .into_iter()
1127 .map(|client| {
1128 let event_sender = event_sender.clone();
1129 let f: BoxFuture<_> = Box::pin(async move {
1130 let initial_time = fedimint_core::time::now();
1131 while initial_time.elapsed()? < duration {
1132 let m = fedimint_core::time::now();
1133 let _epoch: u64 = client
1134 .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()])
1135 .await?;
1136 event_sender.send(MetricEvent {
1137 name: SESSION_COUNT_ENDPOINT.into(),
1138 duration: m.elapsed()?,
1139 })?;
1140 fedimint_core::task::sleep(Duration::from_secs(1)).await;
1141 }
1142 Ok(())
1143 });
1144 f
1145 })
1146 .collect())
1147}
1148
1149fn url_to_string_with_default_port(url: &SafeUrl) -> String {
1150 format!(
1151 "{}://{}:{}{}",
1152 url.scheme(),
1153 url.host().expect("Asserted on construction"),
1154 url.port_or_known_default()
1155 .expect("Asserted on construction"),
1156 url.path()
1157 )
1158}
1159
1160async fn handle_metrics_summary(
1161 opts: Opts,
1162 mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>,
1163) -> anyhow::Result<()> {
1164 let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs();
1165 let mut metrics_json_output_files = vec![];
1166 let mut previous_metrics = vec![];
1167 let mut comparison_output = None;
1168 if let Some(archive_dir) = opts.archive_dir {
1169 let mut archive_metrics = archive_dir.join("metrics");
1170 archive_metrics.push(opts.users.to_string());
1171 tokio::fs::create_dir_all(&archive_metrics).await?;
1172 let mut archive_comparisons = archive_dir.join("comparisons");
1173 archive_comparisons.push(opts.users.to_string());
1174 tokio::fs::create_dir_all(&archive_comparisons).await?;
1175
1176 let latest_metrics_file = std::fs::read_dir(&archive_metrics)?
1177 .map(|entry| {
1178 let entry = entry.unwrap();
1179 let metadata = entry.metadata().unwrap();
1180 let created = metadata
1181 .created()
1182 .unwrap_or_else(|_| metadata.modified().unwrap());
1183 (entry, created)
1184 })
1185 .max_by_key(|(_entry, created)| created.to_owned())
1186 .map(|(entry, _)| entry.path());
1187 if let Some(latest_metrics_file) = latest_metrics_file {
1188 let display_path = latest_metrics_file.display();
1189 let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file)
1190 .await
1191 .with_context(|| format!("Failed to open {display_path}"))?;
1192 let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines();
1193 while let Some(line) = lines.next_line().await? {
1194 match serde_json::from_str::<EventMetricSummary>(&line) {
1195 Ok(metric) => {
1196 previous_metrics.push(metric);
1197 }
1198 Err(e) => {
1199 warn!("Failed to parse previous metric: {e:?}");
1200 }
1201 }
1202 }
1203 }
1204 let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",));
1205 let new_metric_output = BufWriter::new(
1206 OpenOptions::new()
1207 .write(true)
1208 .create(true)
1209 .truncate(true)
1210 .open(new_metric_output)
1211 .await?,
1212 );
1213 metrics_json_output_files.push(new_metric_output);
1214 if !previous_metrics.is_empty() {
1215 let new_comparison_output =
1216 archive_comparisons.join(format!("{timestamp_seconds}.json",));
1217 comparison_output = Some(BufWriter::new(
1218 OpenOptions::new()
1219 .write(true)
1220 .create(true)
1221 .truncate(true)
1222 .open(new_comparison_output)
1223 .await?,
1224 ));
1225 }
1226 }
1227 if let Some(metrics_json_output) = opts.metrics_json_output {
1228 metrics_json_output_files.push(BufWriter::new(
1229 tokio::fs::OpenOptions::new()
1230 .write(true)
1231 .create(true)
1232 .truncate(true)
1233 .open(metrics_json_output)
1234 .await?,
1235 ));
1236 }
1237 let mut results = BTreeMap::new();
1238 while let Some(event) = event_receiver.recv().await {
1239 let entry = results.entry(event.name).or_insert_with(Vec::new);
1240 entry.push(event.duration);
1241 }
1242 let mut previous_metrics = previous_metrics
1243 .into_iter()
1244 .map(|metric| (metric.name.clone(), metric))
1245 .collect::<HashMap<_, _>>();
1246 for (k, mut v) in results {
1247 v.sort();
1248 let n = v.len();
1249 let max = v.iter().last().unwrap();
1250 let min = v.first().unwrap();
1251 let median = v[n / 2];
1252 let sum: Duration = v.iter().sum();
1253 let avg = sum / n as u32;
1254 let metric_summary = EventMetricSummary {
1255 name: k.clone(),
1256 users: u64::from(opts.users),
1257 n: n as u64,
1258 avg_ms: avg.as_millis(),
1259 median_ms: median.as_millis(),
1260 max_ms: max.as_millis(),
1261 min_ms: min.as_millis(),
1262 timestamp_seconds,
1263 };
1264 let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) {
1265 if previous_metric.n == metric_summary.n {
1266 fn calculate_gain(current: u128, previous: u128) -> f64 {
1267 current as f64 / previous as f64
1268 }
1269 let comparison = EventMetricComparison {
1270 avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms),
1271 median_ms_gain: calculate_gain(
1272 metric_summary.median_ms,
1273 previous_metric.median_ms,
1274 ),
1275 max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms),
1276 min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms),
1277 current: metric_summary.clone(),
1278 previous: previous_metric,
1279 };
1280 if let Some(comparison_output) = &mut comparison_output {
1281 let comparison_json =
1282 serde_json::to_string(&comparison).expect("to be serializable");
1283 comparison_output
1284 .write_all(format!("{comparison_json}\n").as_bytes())
1285 .await
1286 .expect("to write on file");
1287 }
1288 Some(comparison)
1289 } else {
1290 info!(
1291 "Skipping comparison for {k} because previous metric has different n ({} vs {})",
1292 previous_metric.n, metric_summary.n
1293 );
1294 None
1295 }
1296 } else {
1297 None
1298 };
1299 if let Some(comparison) = comparison {
1300 println!(
1301 "{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"
1302 );
1303 } else {
1304 println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}");
1305 }
1306 let metric_summary_json =
1307 serde_json::to_string(&metric_summary).expect("to be serializable");
1308 for metrics_json_output_file in &mut metrics_json_output_files {
1309 metrics_json_output_file
1310 .write_all(format!("{metric_summary_json}\n").as_bytes())
1311 .await
1312 .expect("to write on file");
1313 }
1314 }
1315 for mut output in metrics_json_output_files {
1316 output.flush().await?;
1317 }
1318 if let Some(mut output) = comparison_output {
1319 output.flush().await?;
1320 }
1321 Ok(())
1322}
1323
1324async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> {
1325 let gateway_json = match generate_invoice_with {
1326 LnInvoiceGeneration::LdkLightningCli => {
1327 cmd!(GatewayLndCli, "info").out_json().await
1329 }
1330 }?;
1331 let gateway_id = gateway_json["gateway_id"]
1332 .as_str()
1333 .context("Missing gateway_id field")?;
1334
1335 Ok(gateway_id.into())
1336}