1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_precision_loss)]
4#![allow(clippy::missing_errors_doc)]
5#![allow(clippy::ref_option)]
6#![allow(clippy::too_many_lines)]
7#![allow(clippy::large_futures)]
8
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::time::Duration;
13use std::vec;
14
15use anyhow::{Context, anyhow, bail};
16use clap::{Args, Parser, Subcommand, ValueEnum};
17use common::{
18 gateway_pay_invoice, get_note_summary, ldk_create_invoice, ldk_pay_invoice,
19 ldk_wait_invoice_payment, parse_gateway_id, reissue_notes,
20};
21use devimint::cmd;
22use devimint::util::GatewayLndCli;
23use fedimint_client::ClientHandleArc;
24use fedimint_connectors::ConnectorRegistry;
25use fedimint_core::Amount;
26use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
27use fedimint_core::invite_code::InviteCode;
28use fedimint_core::module::ApiRequestErased;
29use fedimint_core::runtime::spawn;
30use fedimint_core::util::{BoxFuture, SafeUrl};
31use fedimint_ln_client::{LightningClientModule, LnReceiveState};
32use fedimint_ln_common::LightningGateway;
33use fedimint_mint_client::OOBNotes;
34use futures::StreamExt;
35use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
36use serde::{Deserialize, Serialize};
37use tokio::fs::OpenOptions;
38use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
39use tokio::sync::mpsc;
40use tracing::{debug, info, warn};
41
42use crate::common::{
43 build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli,
44};
45pub mod common;
46
47#[derive(Parser, Clone)]
48#[command(version)]
49struct Opts {
50 #[arg(
51 long,
52 default_value = "10",
53 help = "Number of users. Each user will work in parallel"
54 )]
55 users: u16,
56
57 #[arg(long, help = "Output with the metrics results in JSON format")]
58 metrics_json_output: Option<PathBuf>,
59
60 #[arg(
61 long,
62 help = "If given, will be used to store and retrieve past metrics for comparison purposes"
63 )]
64 archive_dir: Option<PathBuf>,
65
66 #[clap(subcommand)]
67 command: Command,
68}
69
70#[derive(Debug, Clone, Copy, ValueEnum)]
71enum LnInvoiceGeneration {
72 LdkLightningCli,
73}
74
75#[derive(Subcommand, Clone)]
76enum Command {
77 #[command(about = "Keep many websocket connections to a federation for a duration of time")]
78 TestConnect {
79 #[arg(long, help = "Federation invite code")]
80 invite_code: String,
81 #[arg(
82 long,
83 default_value = "60",
84 help = "How much time to keep the connections open, in seconds"
85 )]
86 duration_secs: u64,
87 #[arg(
88 long,
89 default_value = "120",
90 help = "Timeout for connection attempt and for each request, in secnods"
91 )]
92 timeout_secs: u64,
93 #[arg(
94 long,
95 help = "If given, will limit the number of endpoints (guardians) to connect to"
96 )]
97 limit_endpoints: Option<usize>,
98 },
99 #[command(about = "Try to download the client config many times.")]
100 TestDownload {
101 #[arg(long, help = "Federation invite code")]
102 invite_code: String,
103 },
104 #[command(
105 about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway"
106 )]
107 LoadTest(LoadTestArgs),
108 #[command()]
113 LnCircularLoadTest(LnCircularLoadTestArgs),
114}
115
116#[derive(Args, Clone)]
117struct LoadTestArgs {
118 #[arg(
119 long,
120 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
121 )]
122 invite_code: Option<InviteCode>,
123
124 #[arg(
125 long,
126 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
127 )]
128 initial_notes: Option<OOBNotes>,
129
130 #[arg(
131 long,
132 help = "Gateway Id. If none, retrieve one according to --generate-invoice-with"
133 )]
134 gateway_id: Option<String>,
135
136 #[arg(
137 long,
138 help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)"
139 )]
140 generate_invoice_with: Option<LnInvoiceGeneration>,
141
142 #[arg(
143 long,
144 default_value = "1",
145 help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided"
146 )]
147 invoices_per_user: u16,
148
149 #[arg(
150 long,
151 default_value = "0",
152 help = "How many seconds to sleep between LN payments"
153 )]
154 ln_payment_sleep_secs: u64,
155
156 #[arg(
157 long,
158 help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid"
159 )]
160 invoices_file: Option<PathBuf>,
161
162 #[arg(
163 long,
164 help = "How many notes to distribute to each user",
165 default_value = "2"
166 )]
167 notes_per_user: u16,
168
169 #[arg(
170 long,
171 help = "Note denomination to use for the test",
172 default_value = "1024"
173 )]
174 note_denomination: Amount,
175
176 #[arg(
177 long,
178 help = "Invoice amount when generating one",
179 default_value = "1000"
180 )]
181 invoice_amount: Amount,
182}
183
184#[derive(Args, Clone)]
185struct LnCircularLoadTestArgs {
186 #[arg(
187 long,
188 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
189 )]
190 invite_code: Option<InviteCode>,
191
192 #[arg(
193 long,
194 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
195 )]
196 initial_notes: Option<OOBNotes>,
197
198 #[arg(
199 long,
200 default_value = "60",
201 help = "For how many seconds to run the test"
202 )]
203 test_duration_secs: u64,
204
205 #[arg(
206 long,
207 default_value = "0",
208 help = "How many seconds to sleep between LN payments"
209 )]
210 ln_payment_sleep_secs: u64,
211
212 #[arg(
213 long,
214 help = "How many notes to distribute to each user",
215 default_value = "1"
216 )]
217 notes_per_user: u16,
218
219 #[arg(
220 long,
221 help = "Note denomination to use for the test",
222 default_value = "1024"
223 )]
224 note_denomination: Amount,
225
226 #[arg(
227 long,
228 help = "Invoice amount when generating one",
229 default_value = "1000"
230 )]
231 invoice_amount: Amount,
232
233 #[arg(long)]
234 strategy: LnCircularStrategy,
235}
236
237#[derive(Debug, Clone, Copy, ValueEnum)]
238enum LnCircularStrategy {
239 SelfPayment,
241 TwoGateways,
244 PartnerPingPong,
246}
247
248#[derive(Debug, Clone)]
249pub struct MetricEvent {
250 name: String,
251 duration: Duration,
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize)]
255struct EventMetricSummary {
256 name: String,
257 users: u64,
258 n: u64,
259 avg_ms: u128,
260 median_ms: u128,
261 max_ms: u128,
262 min_ms: u128,
263 timestamp_seconds: u64,
264}
265
266#[derive(Debug, Serialize, Deserialize)]
267struct EventMetricComparison {
268 avg_ms_gain: f64,
269 median_ms_gain: f64,
270 max_ms_gain: f64,
271 min_ms_gain: f64,
272 current: EventMetricSummary,
273 previous: EventMetricSummary,
274}
275
276impl std::fmt::Display for EventMetricComparison {
277 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
278 fn to_percent(gain: f64) -> String {
279 if gain >= 1.0 {
280 format!("+{:.2}%", (gain - 1.0) * 100.0)
281 } else {
282 format!("-{:.2}%", (1.0 - gain) * 100.0)
283 }
284 }
285 f.write_str(&format!(
286 "avg: {}, median: {}, max: {}, min: {}",
287 to_percent(self.avg_ms_gain),
288 to_percent(self.median_ms_gain),
289 to_percent(self.max_ms_gain),
290 to_percent(self.min_ms_gain),
291 ))
292 }
293}
294
295#[tokio::main]
296async fn main() -> anyhow::Result<()> {
297 fedimint_logging::TracingSetup::default().init()?;
298 let opts = Opts::parse();
299 let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
300 let summary_handle = spawn("handle metrics summary", {
301 let opts = opts.clone();
302 async { handle_metrics_summary(opts, event_receiver).await }
303 });
304 let futures = match opts.command.clone() {
305 Command::TestConnect {
306 invite_code,
307 duration_secs,
308 timeout_secs,
309 limit_endpoints,
310 } => {
311 let connectors = ConnectorRegistry::build_from_client_env()?.bind().await?;
312 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
313 test_connect_raw_client(
314 &connectors,
315 invite_code,
316 opts.users,
317 Duration::from_secs(duration_secs),
318 Duration::from_secs(timeout_secs),
319 limit_endpoints,
320 event_sender.clone(),
321 )
322 .await?
323 }
324 Command::TestDownload { invite_code } => {
325 let connectors = ConnectorRegistry::build_from_client_env()?.bind().await?;
326 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
327 test_download_config(&connectors, &invite_code, opts.users, &event_sender.clone())
328 }
329 Command::LoadTest(args) => {
330 let invite_code = invite_code_or_fallback(args.invite_code).await;
331
332 let gateway_id = if let Some(gateway_id) = args.gateway_id {
333 Some(gateway_id)
334 } else if let Some(generate_invoice_with) = args.generate_invoice_with {
335 Some(get_gateway_id(generate_invoice_with).await?)
336 } else {
337 None
338 };
339 let invoices = if let Some(invoices_file) = args.invoices_file {
340 let display_path = invoices_file.display();
341 let invoices_file = tokio::fs::File::open(&invoices_file)
342 .await
343 .with_context(|| format!("Failed to open {display_path}"))?;
344 let mut lines = tokio::io::BufReader::new(invoices_file).lines();
345 let mut invoices = vec![];
346 while let Some(line) = lines.next_line().await? {
347 let invoice = Bolt11Invoice::from_str(&line)?;
348 invoices.push(invoice);
349 }
350 invoices
351 } else {
352 vec![]
353 };
354 if args.generate_invoice_with.is_none() && invoices.is_empty() {
355 info!(
356 "No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run"
357 );
358 }
359 run_load_test(
360 opts.archive_dir,
361 opts.users,
362 invite_code,
363 args.initial_notes,
364 args.generate_invoice_with,
365 args.invoices_per_user,
366 Duration::from_secs(args.ln_payment_sleep_secs),
367 invoices,
368 gateway_id,
369 args.notes_per_user,
370 args.note_denomination,
371 args.invoice_amount,
372 event_sender.clone(),
373 )
374 .await?
375 }
376 Command::LnCircularLoadTest(args) => {
377 let invite_code = invite_code_or_fallback(args.invite_code).await;
378 run_ln_circular_load_test(
379 opts.archive_dir,
380 opts.users,
381 invite_code,
382 args.initial_notes,
383 Duration::from_secs(args.test_duration_secs),
384 Duration::from_secs(args.ln_payment_sleep_secs),
385 args.notes_per_user,
386 args.note_denomination,
387 args.invoice_amount,
388 args.strategy,
389 event_sender.clone(),
390 )
391 .await?
392 }
393 };
394
395 let result = futures::future::join_all(futures).await;
396 drop(event_sender);
397 summary_handle.await??;
398 let len_failures = result.iter().filter(|r| r.is_err()).count();
399 eprintln!("{} results, {len_failures} failures", result.len());
400 for r in result {
401 if let Err(e) = r {
402 warn!("Task failed: {:?}", e);
403 }
404 }
405 if len_failures > 0 {
406 bail!("Finished with failures");
407 }
408 info!("Finished successfully");
409 Ok(())
410}
411
412async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> {
413 if let Some(invite_code) = invite_code {
414 Some(invite_code)
415 } else {
416 match get_invite_code_cli(0.into()).await {
418 Ok(invite_code) => Some(invite_code),
419 Err(e) => {
420 info!(
421 "No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."
422 );
423 None
424 }
425 }
426 }
427}
428
429#[allow(clippy::too_many_arguments)]
430async fn run_load_test(
431 archive_dir: Option<PathBuf>,
432 users: u16,
433 invite_code: Option<InviteCode>,
434 initial_notes: Option<OOBNotes>,
435 generate_invoice_with: Option<LnInvoiceGeneration>,
436 generated_invoices_per_user: u16,
437 ln_payment_sleep: Duration,
438 invoices_from_file: Vec<Bolt11Invoice>,
439 gateway_id: Option<String>,
440 notes_per_user: u16,
441 note_denomination: Amount,
442 invoice_amount: Amount,
443 event_sender: mpsc::UnboundedSender<MetricEvent>,
444) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
445 let db_path = get_db_path(&archive_dir);
446 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
447 let minimum_notes = notes_per_user * users;
448 let minimum_amount_required = note_denomination * u64::from(minimum_notes);
449
450 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
451 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
452 print_coordinator_notes(&coordinator).await?;
453 info!(
454 "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
455 );
456 remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
457 print_coordinator_notes(&coordinator).await?;
458
459 let users_clients = get_users_clients(users, db_path, invite_code).await?;
460
461 let mut users_notes =
462 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
463 let mut users_invoices = HashMap::new();
464 let mut user = 0;
465 for invoice in invoices_from_file {
467 users_invoices
468 .entry(user)
469 .or_insert_with(Vec::new)
470 .push(invoice);
471 user = (user + 1) % users;
472 }
473
474 info!("Starting user tasks");
475 let futures = users_clients
476 .into_iter()
477 .enumerate()
478 .map(|(u, client)| {
479 let u = u as u16;
480 let oob_notes = users_notes.remove(&u).unwrap();
481 let invoices = users_invoices.remove(&u).unwrap_or_default();
482 let event_sender = event_sender.clone();
483 let f: BoxFuture<_> = Box::pin(do_load_test_user_task(
484 format!("User {u}:"),
485 client,
486 oob_notes,
487 generated_invoices_per_user,
488 ln_payment_sleep,
489 invoice_amount,
490 invoices,
491 generate_invoice_with,
492 event_sender,
493 gateway_id.clone(),
494 ));
495 f
496 })
497 .collect::<Vec<_>>();
498
499 Ok(futures)
500}
501
502async fn get_notes_for_users(
503 users: u16,
504 notes_per_user: u16,
505 coordinator: ClientHandleArc,
506 note_denomination: Amount,
507) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> {
508 let mut users_notes = HashMap::new();
509 for u in 0..users {
510 users_notes.insert(u, Vec::with_capacity(notes_per_user.into()));
511 for _ in 0..notes_per_user {
512 let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?;
513 let user_amount = oob_notes.total_amount();
514 info!("Giving {user_amount} to user {u}");
515 users_notes.get_mut(&u).unwrap().push(oob_notes);
516 }
517 }
518 Ok(users_notes)
519}
520
521async fn get_users_clients(
522 n: u16,
523 db_path: Option<PathBuf>,
524 invite_code: Option<InviteCode>,
525) -> anyhow::Result<Vec<ClientHandleArc>> {
526 let mut users_clients = Vec::with_capacity(n.into());
527 for u in 0..n {
528 let (client, _) = get_user_client(u, &db_path, &invite_code).await?;
529 users_clients.push(client);
530 }
531 Ok(users_clients)
532}
533
534async fn get_user_client(
535 user_index: u16,
536 db_path: &Option<PathBuf>,
537 invite_code: &Option<InviteCode>,
538) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
539 let user_db = db_path
540 .as_ref()
541 .map(|db_path| db_path.join(format!("user_{user_index}.db")));
542 let user_invite_code = if user_db.as_ref().is_some_and(|db| db.exists()) {
543 None
544 } else {
545 invite_code.clone()
546 };
547 let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?;
548 if let Ok(ln_client) = client.get_first_module::<LightningClientModule>() {
550 let _ = ln_client.update_gateway_cache().await;
551 }
552 Ok((client, invite_code))
553}
554
555async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> {
556 info!("Note summary:");
557 let summary = get_note_summary(coordinator).await?;
558 for (k, v) in summary.iter() {
559 info!("{k}: {v}");
560 }
561 Ok(())
562}
563
564async fn get_required_notes(
565 coordinator: &ClientHandleArc,
566 minimum_amount_required: Amount,
567 event_sender: &mpsc::UnboundedSender<MetricEvent>,
568) -> anyhow::Result<()> {
569 let current_balance = coordinator.get_balance_for_btc().await?;
570
571 if current_balance < minimum_amount_required {
572 let diff = minimum_amount_required.saturating_sub(current_balance);
573 info!(
574 "Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"
575 );
576 match try_get_notes_cli(&diff, 5).await {
577 Ok(notes) => {
578 info!("Got {} more notes, reissuing them", notes.total_amount());
579 reissue_notes(coordinator, notes, event_sender).await?;
580 }
581 Err(e) => {
582 info!("Unable to get more notes: '{e}', will try to proceed without them");
583 }
584 }
585 } else {
586 info!(
587 "Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"
588 );
589 }
590 Ok(())
591}
592
593async fn reissue_initial_notes(
594 initial_notes: Option<OOBNotes>,
595 coordinator: &ClientHandleArc,
596 event_sender: &mpsc::UnboundedSender<MetricEvent>,
597) -> anyhow::Result<()> {
598 if let Some(notes) = initial_notes {
599 let amount = notes.total_amount();
600 info!("Reissuing initial notes, got {amount}");
601 reissue_notes(coordinator, notes, event_sender).await?;
602 }
603 Ok(())
604}
605
606async fn get_coordinator_client(
607 db_path: &Option<PathBuf>,
608 invite_code: &Option<InviteCode>,
609) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
610 let (client, invite_code) = if let Some(db_path) = db_path {
611 let coordinator_db = db_path.join("coordinator.db");
612 if coordinator_db.exists() {
613 build_client(invite_code.clone(), Some(&coordinator_db)).await?
614 } else {
615 tokio::fs::create_dir_all(db_path).await?;
616 build_client(
617 Some(invite_code.clone().context(
618 "Running on this archive dir for the first time, an invite code is required",
619 )?),
620 Some(&coordinator_db),
621 )
622 .await?
623 }
624 } else {
625 build_client(
626 Some(
627 invite_code
628 .clone()
629 .context("No archive dir given, an invite code is strictly required")?,
630 ),
631 None,
632 )
633 .await?
634 };
635 Ok((client, invite_code))
636}
637
638fn get_db_path(archive_dir: &Option<PathBuf>) -> Option<PathBuf> {
639 archive_dir.as_ref().map(|p| p.join("db"))
640}
641
642async fn get_lightning_gateway(
643 client: &ClientHandleArc,
644 gateway_id: Option<String>,
645) -> Option<LightningGateway> {
646 let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id");
647 let ln_module = client
648 .get_first_module::<LightningClientModule>()
649 .expect("Must have ln client module");
650 ln_module.select_gateway(&gateway_id).await
651}
652
653#[allow(clippy::too_many_arguments)]
654async fn do_load_test_user_task(
655 prefix: String,
656 client: ClientHandleArc,
657 oob_notes: Vec<OOBNotes>,
658 generated_invoices_per_user: u16,
659 ln_payment_sleep: Duration,
660 invoice_amount: Amount,
661 additional_invoices: Vec<Bolt11Invoice>,
662 generate_invoice_with: Option<LnInvoiceGeneration>,
663 event_sender: mpsc::UnboundedSender<MetricEvent>,
664 gateway_id: Option<String>,
665) -> anyhow::Result<()> {
666 let ln_gateway = get_lightning_gateway(&client, gateway_id).await;
667 for oob_note in oob_notes {
668 let amount = oob_note.total_amount();
669 reissue_notes(&client, oob_note, &event_sender)
670 .await
671 .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
672 }
673 let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable();
674 while let Some(_) = generated_invoices_per_user_iterator.next() {
675 let total_amount = get_note_summary(&client).await?.total_amount();
676 if invoice_amount > total_amount {
677 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
678 } else {
679 match generate_invoice_with {
680 Some(LnInvoiceGeneration::LdkLightningCli) => {
681 let invoice = ldk_create_invoice(invoice_amount).await?;
682 gateway_pay_invoice(
683 &prefix,
684 "LND",
685 &client,
686 invoice.clone(),
687 &event_sender,
688 ln_gateway.clone(),
689 )
690 .await?;
691 ldk_wait_invoice_payment(&invoice).await?;
692 }
693 None if additional_invoices.is_empty() => {
694 debug!(
695 "No method given to generate an invoice and no invoices on file, will not test the gateway"
696 );
697 break;
698 }
699 None => {
700 break;
701 }
702 }
703 if generated_invoices_per_user_iterator.peek().is_some() {
704 fedimint_core::task::sleep(ln_payment_sleep).await;
706 }
707 }
708 }
709 let mut additional_invoices = additional_invoices.into_iter().peekable();
710 while let Some(invoice) = additional_invoices.next() {
711 let total_amount = get_note_summary(&client).await?.total_amount();
712 let invoice_amount =
713 Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default());
714 if invoice_amount > total_amount {
715 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
716 } else if invoice_amount == Amount::ZERO {
717 warn!("Can't pay invoice {invoice}, amount is zero");
718 } else {
719 gateway_pay_invoice(
720 &prefix,
721 "unknown",
722 &client,
723 invoice,
724 &event_sender,
725 ln_gateway.clone(),
726 )
727 .await?;
728 if additional_invoices.peek().is_some() {
729 fedimint_core::task::sleep(ln_payment_sleep).await;
731 }
732 }
733 }
734 Ok(())
735}
736
737#[allow(clippy::too_many_arguments)]
738async fn run_ln_circular_load_test(
739 archive_dir: Option<PathBuf>,
740 users: u16,
741 invite_code: Option<InviteCode>,
742 initial_notes: Option<OOBNotes>,
743 test_duration: Duration,
744 ln_payment_sleep: Duration,
745 notes_per_user: u16,
746 note_denomination: Amount,
747 invoice_amount: Amount,
748 strategy: LnCircularStrategy,
749 event_sender: mpsc::UnboundedSender<MetricEvent>,
750) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
751 let db_path = get_db_path(&archive_dir);
752 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
753 let minimum_notes = notes_per_user * users;
754 let minimum_amount_required = note_denomination * u64::from(minimum_notes);
755
756 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
757 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
758
759 info!(
760 "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
761 );
762 remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
763
764 print_coordinator_notes(&coordinator).await?;
765
766 let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?;
767
768 let mut users_notes =
769 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
770
771 info!("Starting user tasks");
772 let futures = users_clients
773 .into_iter()
774 .enumerate()
775 .map(|(u, client)| {
776 let u = u as u16;
777 let oob_notes = users_notes.remove(&u).unwrap();
778 let event_sender = event_sender.clone();
779 let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task(
780 format!("User {u}:"),
781 client,
782 invite_code.clone(),
783 oob_notes,
784 test_duration,
785 ln_payment_sleep,
786 invoice_amount,
787 strategy,
788 event_sender,
789 ));
790 f
791 })
792 .collect::<Vec<_>>();
793
794 Ok(futures)
795}
796
797#[allow(clippy::too_many_arguments)]
798async fn do_ln_circular_test_user_task(
799 prefix: String,
800 client: ClientHandleArc,
801 invite_code: Option<InviteCode>,
802 oob_notes: Vec<OOBNotes>,
803 test_duration: Duration,
804 ln_payment_sleep: Duration,
805 invoice_amount: Amount,
806 strategy: LnCircularStrategy,
807 event_sender: mpsc::UnboundedSender<MetricEvent>,
808) -> anyhow::Result<()> {
809 for oob_note in oob_notes {
810 let amount = oob_note.total_amount();
811 reissue_notes(&client, oob_note, &event_sender)
812 .await
813 .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
814 }
815 let initial_time = fedimint_core::time::now();
816 let still_ontime = || async {
817 fedimint_core::time::now()
818 .duration_since(initial_time)
819 .expect("time to work")
820 <= test_duration
821 };
822 let sleep_a_bit = || async {
823 if still_ontime().await {
824 fedimint_core::task::sleep(ln_payment_sleep).await;
825 }
826 };
827 match strategy {
828 LnCircularStrategy::TwoGateways => {
829 let invoice_generation = LnInvoiceGeneration::LdkLightningCli;
830 while still_ontime().await {
831 let gateway_id = get_gateway_id(invoice_generation).await?;
832 let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await;
833 run_two_gateways_strategy(
834 &prefix,
835 &invoice_generation,
836 &invoice_amount,
837 &event_sender,
838 &client,
839 ln_gateway,
840 )
841 .await?;
842 sleep_a_bit().await;
843 }
844 }
845 LnCircularStrategy::SelfPayment => {
846 while still_ontime().await {
847 do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?;
848 sleep_a_bit().await;
849 }
850 }
851 LnCircularStrategy::PartnerPingPong => {
852 let (partner, _) = build_client(invite_code, None).await?;
853 while still_ontime().await {
854 do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender)
855 .await?;
856 sleep_a_bit().await;
857 }
858 }
859 }
860 Ok(())
861}
862
863const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice";
864
865async fn run_two_gateways_strategy(
866 prefix: &str,
867 invoice_generation: &LnInvoiceGeneration,
868 invoice_amount: &Amount,
869 event_sender: &mpsc::UnboundedSender<MetricEvent>,
870 client: &ClientHandleArc,
871 ln_gateway: Option<LightningGateway>,
872) -> Result<(), anyhow::Error> {
873 let create_invoice_time = fedimint_core::time::now();
874 match *invoice_generation {
875 LnInvoiceGeneration::LdkLightningCli => {
876 let invoice = ldk_create_invoice(*invoice_amount).await?;
877 let elapsed = create_invoice_time.elapsed()?;
878 info!("Created invoice using LDK in {elapsed:?}");
879 event_sender.send(MetricEvent {
880 name: GATEWAY_CREATE_INVOICE.into(),
881 duration: elapsed,
882 })?;
883 gateway_pay_invoice(
884 prefix,
885 "LND",
886 client,
887 invoice.clone(),
888 event_sender,
889 ln_gateway.clone(),
890 )
891 .await?;
892 ldk_wait_invoice_payment(&invoice).await?;
893 let (operation_id, invoice) =
894 client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?;
895 let pay_invoice_time = fedimint_core::time::now();
896 ldk_pay_invoice(invoice).await?;
897 wait_invoice_payment(
898 prefix,
899 "LND",
900 client,
901 operation_id,
902 event_sender,
903 pay_invoice_time,
904 )
905 .await?;
906 }
907 }
908 Ok(())
909}
910
911async fn do_self_payment(
912 prefix: &str,
913 client: &ClientHandleArc,
914 invoice_amount: Amount,
915 event_sender: &mpsc::UnboundedSender<MetricEvent>,
916) -> anyhow::Result<()> {
917 let (operation_id, invoice) =
918 client_create_invoice(client, invoice_amount, event_sender, None).await?;
919 let pay_invoice_time = fedimint_core::time::now();
920 let lightning_module = client.get_first_module::<LightningClientModule>()?;
921 lightning_module
923 .pay_bolt11_invoice(None, invoice, ())
924 .await?;
925 wait_invoice_payment(
926 prefix,
927 "gateway",
928 client,
929 operation_id,
930 event_sender,
931 pay_invoice_time,
932 )
933 .await?;
934 Ok(())
935}
936
937async fn do_partner_ping_pong(
938 prefix: &str,
939 client: &ClientHandleArc,
940 partner: &ClientHandleArc,
941 invoice_amount: Amount,
942 event_sender: &mpsc::UnboundedSender<MetricEvent>,
943) -> anyhow::Result<()> {
944 let (operation_id, invoice) =
946 client_create_invoice(partner, invoice_amount, event_sender, None).await?;
947 let pay_invoice_time = fedimint_core::time::now();
948 let lightning_module = client.get_first_module::<LightningClientModule>()?;
949 lightning_module
952 .pay_bolt11_invoice(None, invoice, ())
953 .await?;
954 wait_invoice_payment(
955 prefix,
956 "gateway",
957 partner,
958 operation_id,
959 event_sender,
960 pay_invoice_time,
961 )
962 .await?;
963 let (operation_id, invoice) =
965 client_create_invoice(client, invoice_amount, event_sender, None).await?;
966 let pay_invoice_time = fedimint_core::time::now();
967 let partner_lightning_module = partner.get_first_module::<LightningClientModule>()?;
968 partner_lightning_module
971 .pay_bolt11_invoice(None, invoice, ())
972 .await?;
973 wait_invoice_payment(
974 prefix,
975 "gateway",
976 client,
977 operation_id,
978 event_sender,
979 pay_invoice_time,
980 )
981 .await?;
982 Ok(())
983}
984
985async fn wait_invoice_payment(
986 prefix: &str,
987 gateway_name: &str,
988 client: &ClientHandleArc,
989 operation_id: fedimint_core::core::OperationId,
990 event_sender: &mpsc::UnboundedSender<MetricEvent>,
991 pay_invoice_time: std::time::SystemTime,
992) -> anyhow::Result<()> {
993 let elapsed = pay_invoice_time.elapsed()?;
994 info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}");
995 event_sender.send(MetricEvent {
996 name: format!("gateway_{gateway_name}_payment_received_started"),
997 duration: elapsed,
998 })?;
999 let lightning_module = client.get_first_module::<LightningClientModule>()?;
1000 let mut updates = lightning_module
1001 .subscribe_ln_receive(operation_id)
1002 .await?
1003 .into_stream();
1004 while let Some(update) = updates.next().await {
1005 debug!(%prefix, ?update, "Invoice payment update");
1006 match update {
1007 LnReceiveState::Claimed => {
1008 let elapsed: Duration = pay_invoice_time.elapsed()?;
1009 info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}");
1010 event_sender.send(MetricEvent {
1011 name: "gateway_payment_received_success".into(),
1012 duration: elapsed,
1013 })?;
1014 event_sender.send(MetricEvent {
1015 name: format!("gateway_{gateway_name}_payment_received_success"),
1016 duration: elapsed,
1017 })?;
1018 break;
1019 }
1020 LnReceiveState::Canceled { reason } => {
1021 let elapsed: Duration = pay_invoice_time.elapsed()?;
1022 info!(
1023 "{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"
1024 );
1025 event_sender.send(MetricEvent {
1026 name: "gateway_payment_received_canceled".into(),
1027 duration: elapsed,
1028 })?;
1029 break;
1030 }
1031 _ => {}
1032 }
1033 }
1034 Ok(())
1035}
1036
1037async fn client_create_invoice(
1038 client: &ClientHandleArc,
1039 invoice_amount: Amount,
1040 event_sender: &mpsc::UnboundedSender<MetricEvent>,
1041 ln_gateway: Option<LightningGateway>,
1042) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> {
1043 let create_invoice_time = fedimint_core::time::now();
1044 let lightning_module = client.get_first_module::<LightningClientModule>()?;
1045 let desc = Description::new("test".to_string())?;
1046 let (operation_id, invoice, _) = lightning_module
1047 .create_bolt11_invoice(
1048 invoice_amount,
1049 Bolt11InvoiceDescription::Direct(desc),
1050 None,
1051 (),
1052 ln_gateway,
1053 )
1054 .await?;
1055 let elapsed = create_invoice_time.elapsed()?;
1056 info!("Created invoice using gateway in {elapsed:?}");
1057 event_sender.send(MetricEvent {
1058 name: GATEWAY_CREATE_INVOICE.into(),
1059 duration: elapsed,
1060 })?;
1061 Ok((operation_id, invoice))
1062}
1063
1064fn test_download_config(
1065 endpoints: &ConnectorRegistry,
1066 invite_code: &InviteCode,
1067 users: u16,
1068 event_sender: &mpsc::UnboundedSender<MetricEvent>,
1069) -> Vec<BoxFuture<'static, anyhow::Result<()>>> {
1070 (0..users)
1071 .map(move |_| {
1072 let invite_code = invite_code.clone();
1073 let event_sender = event_sender.clone();
1074 let endpoints = endpoints.clone();
1075 let f: BoxFuture<_> = Box::pin(async move {
1076 let m = fedimint_core::time::now();
1077 let _ = fedimint_api_client::api::net::ConnectorType::default()
1078 .download_from_invite_code(&endpoints, &invite_code)
1079 .await?;
1080 event_sender.send(MetricEvent {
1081 name: "download_client_config".into(),
1082 duration: m.elapsed()?,
1083 })?;
1084 Ok(())
1085 });
1086 f
1087 })
1088 .collect()
1089}
1090
1091async fn test_connect_raw_client(
1092 endpoints: &ConnectorRegistry,
1093 invite_code: InviteCode,
1094 users: u16,
1095 duration: Duration,
1096 timeout: Duration,
1097 limit_endpoints: Option<usize>,
1098 event_sender: mpsc::UnboundedSender<MetricEvent>,
1099) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
1100 use jsonrpsee_core::client::ClientT;
1101 use jsonrpsee_ws_client::WsClientBuilder;
1102
1103 let (mut cfg, _) = fedimint_api_client::api::net::ConnectorType::default()
1104 .download_from_invite_code(endpoints, &invite_code)
1105 .await?;
1106
1107 if let Some(limit_endpoints) = limit_endpoints {
1108 cfg.global.api_endpoints = cfg
1109 .global
1110 .api_endpoints
1111 .into_iter()
1112 .take(limit_endpoints)
1113 .collect();
1114 info!("Limiting endpoints to {:?}", cfg.global.api_endpoints);
1115 }
1116
1117 info!("Connecting to {users} clients");
1118 let clients = (0..users)
1119 .flat_map(|_| {
1120 cfg.global.api_endpoints.values().map(|url| async {
1121 let ws_client = WsClientBuilder::default()
1122 .request_timeout(timeout)
1123 .connection_timeout(timeout)
1124 .build(url_to_string_with_default_port(&url.url))
1125 .await?;
1126 Ok::<_, anyhow::Error>(ws_client)
1127 })
1128 })
1129 .collect::<Vec<_>>();
1130 let clients = futures::future::try_join_all(clients).await?;
1131 info!("Keeping {users} clients connected for {duration:?}");
1132 Ok(clients
1133 .into_iter()
1134 .map(|client| {
1135 let event_sender = event_sender.clone();
1136 let f: BoxFuture<_> = Box::pin(async move {
1137 let initial_time = fedimint_core::time::now();
1138 while initial_time.elapsed()? < duration {
1139 let m = fedimint_core::time::now();
1140 let _epoch: u64 = client
1141 .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()])
1142 .await?;
1143 event_sender.send(MetricEvent {
1144 name: SESSION_COUNT_ENDPOINT.into(),
1145 duration: m.elapsed()?,
1146 })?;
1147 fedimint_core::task::sleep(Duration::from_secs(1)).await;
1148 }
1149 Ok(())
1150 });
1151 f
1152 })
1153 .collect())
1154}
1155
1156fn url_to_string_with_default_port(url: &SafeUrl) -> String {
1157 format!(
1158 "{}://{}:{}{}",
1159 url.scheme(),
1160 url.host().expect("Asserted on construction"),
1161 url.port_or_known_default()
1162 .expect("Asserted on construction"),
1163 url.path()
1164 )
1165}
1166
1167async fn handle_metrics_summary(
1168 opts: Opts,
1169 mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>,
1170) -> anyhow::Result<()> {
1171 let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs();
1172 let mut metrics_json_output_files = vec![];
1173 let mut previous_metrics = vec![];
1174 let mut comparison_output = None;
1175 if let Some(archive_dir) = opts.archive_dir {
1176 let mut archive_metrics = archive_dir.join("metrics");
1177 archive_metrics.push(opts.users.to_string());
1178 tokio::fs::create_dir_all(&archive_metrics).await?;
1179 let mut archive_comparisons = archive_dir.join("comparisons");
1180 archive_comparisons.push(opts.users.to_string());
1181 tokio::fs::create_dir_all(&archive_comparisons).await?;
1182
1183 let latest_metrics_file = std::fs::read_dir(&archive_metrics)?
1184 .map(|entry| {
1185 let entry = entry.unwrap();
1186 let metadata = entry.metadata().unwrap();
1187 let created = metadata
1188 .created()
1189 .unwrap_or_else(|_| metadata.modified().unwrap());
1190 (entry, created)
1191 })
1192 .max_by_key(|(_entry, created)| created.to_owned())
1193 .map(|(entry, _)| entry.path());
1194 if let Some(latest_metrics_file) = latest_metrics_file {
1195 let display_path = latest_metrics_file.display();
1196 let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file)
1197 .await
1198 .with_context(|| format!("Failed to open {display_path}"))?;
1199 let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines();
1200 while let Some(line) = lines.next_line().await? {
1201 match serde_json::from_str::<EventMetricSummary>(&line) {
1202 Ok(metric) => {
1203 previous_metrics.push(metric);
1204 }
1205 Err(e) => {
1206 warn!("Failed to parse previous metric: {e:?}");
1207 }
1208 }
1209 }
1210 }
1211 let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",));
1212 let new_metric_output = BufWriter::new(
1213 OpenOptions::new()
1214 .write(true)
1215 .create(true)
1216 .truncate(true)
1217 .open(new_metric_output)
1218 .await?,
1219 );
1220 metrics_json_output_files.push(new_metric_output);
1221 if !previous_metrics.is_empty() {
1222 let new_comparison_output =
1223 archive_comparisons.join(format!("{timestamp_seconds}.json",));
1224 comparison_output = Some(BufWriter::new(
1225 OpenOptions::new()
1226 .write(true)
1227 .create(true)
1228 .truncate(true)
1229 .open(new_comparison_output)
1230 .await?,
1231 ));
1232 }
1233 }
1234 if let Some(metrics_json_output) = opts.metrics_json_output {
1235 metrics_json_output_files.push(BufWriter::new(
1236 tokio::fs::OpenOptions::new()
1237 .write(true)
1238 .create(true)
1239 .truncate(true)
1240 .open(metrics_json_output)
1241 .await?,
1242 ));
1243 }
1244 let mut results = BTreeMap::new();
1245 while let Some(event) = event_receiver.recv().await {
1246 let entry = results.entry(event.name).or_insert_with(Vec::new);
1247 entry.push(event.duration);
1248 }
1249 let mut previous_metrics = previous_metrics
1250 .into_iter()
1251 .map(|metric| (metric.name.clone(), metric))
1252 .collect::<HashMap<_, _>>();
1253 for (k, mut v) in results {
1254 v.sort();
1255 let n = v.len();
1256 let max = v.iter().last().unwrap();
1257 let min = v.first().unwrap();
1258 let median = v[n / 2];
1259 let sum: Duration = v.iter().sum();
1260 let avg = sum / n as u32;
1261 let metric_summary = EventMetricSummary {
1262 name: k.clone(),
1263 users: u64::from(opts.users),
1264 n: n as u64,
1265 avg_ms: avg.as_millis(),
1266 median_ms: median.as_millis(),
1267 max_ms: max.as_millis(),
1268 min_ms: min.as_millis(),
1269 timestamp_seconds,
1270 };
1271 let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) {
1272 if previous_metric.n == metric_summary.n {
1273 fn calculate_gain(current: u128, previous: u128) -> f64 {
1274 current as f64 / previous as f64
1275 }
1276 let comparison = EventMetricComparison {
1277 avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms),
1278 median_ms_gain: calculate_gain(
1279 metric_summary.median_ms,
1280 previous_metric.median_ms,
1281 ),
1282 max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms),
1283 min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms),
1284 current: metric_summary.clone(),
1285 previous: previous_metric,
1286 };
1287 if let Some(comparison_output) = &mut comparison_output {
1288 let comparison_json =
1289 serde_json::to_string(&comparison).expect("to be serializable");
1290 comparison_output
1291 .write_all(format!("{comparison_json}\n").as_bytes())
1292 .await
1293 .expect("to write on file");
1294 }
1295 Some(comparison)
1296 } else {
1297 info!(
1298 "Skipping comparison for {k} because previous metric has different n ({} vs {})",
1299 previous_metric.n, metric_summary.n
1300 );
1301 None
1302 }
1303 } else {
1304 None
1305 };
1306 if let Some(comparison) = comparison {
1307 println!(
1308 "{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"
1309 );
1310 } else {
1311 println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}");
1312 }
1313 let metric_summary_json =
1314 serde_json::to_string(&metric_summary).expect("to be serializable");
1315 for metrics_json_output_file in &mut metrics_json_output_files {
1316 metrics_json_output_file
1317 .write_all(format!("{metric_summary_json}\n").as_bytes())
1318 .await
1319 .expect("to write on file");
1320 }
1321 }
1322 for mut output in metrics_json_output_files {
1323 output.flush().await?;
1324 }
1325 if let Some(mut output) = comparison_output {
1326 output.flush().await?;
1327 }
1328 Ok(())
1329}
1330
1331async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> {
1332 let gateway_json = match generate_invoice_with {
1333 LnInvoiceGeneration::LdkLightningCli => {
1334 cmd!(GatewayLndCli, "info").out_json().await
1336 }
1337 }?;
1338 let gateway_id = gateway_json["gateway_id"]
1339 .as_str()
1340 .context("Missing gateway_id field")?;
1341
1342 Ok(gateway_id.into())
1343}