1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_precision_loss)]
4#![allow(clippy::missing_errors_doc)]
5#![allow(clippy::ref_option)]
6#![allow(clippy::too_many_lines)]
7#![allow(clippy::large_futures)]
8
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::time::Duration;
13use std::vec;
14
15use anyhow::{Context, anyhow, bail};
16use clap::{Args, Parser, Subcommand, ValueEnum};
17use common::{
18 gateway_pay_invoice, get_note_summary, ldk_create_invoice, ldk_pay_invoice,
19 ldk_wait_invoice_payment, parse_gateway_id, reissue_notes,
20};
21use devimint::cmd;
22use devimint::util::GatewayLndCli;
23use devimint::version_constants::VERSION_0_10_0_ALPHA;
24use fedimint_api_client::download_from_invite_code;
25use fedimint_client::ClientHandleArc;
26use fedimint_connectors::ConnectorRegistry;
27use fedimint_core::Amount;
28use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
29use fedimint_core::invite_code::InviteCode;
30use fedimint_core::module::ApiRequestErased;
31use fedimint_core::runtime::spawn;
32use fedimint_core::util::{BoxFuture, SafeUrl};
33use fedimint_ln_client::{LightningClientModule, LnReceiveState};
34use fedimint_ln_common::LightningGateway;
35use fedimint_mint_client::OOBNotes;
36use futures::StreamExt;
37use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
38use serde::{Deserialize, Serialize};
39use tokio::fs::OpenOptions;
40use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
41use tokio::sync::mpsc;
42use tracing::{debug, info, warn};
43
44use crate::common::{
45 build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli,
46};
47pub mod common;
48
49#[derive(Parser, Clone)]
50#[command(version)]
51struct Opts {
52 #[arg(
53 long,
54 default_value = "10",
55 help = "Number of users. Each user will work in parallel"
56 )]
57 users: u16,
58
59 #[arg(long, help = "Output with the metrics results in JSON format")]
60 metrics_json_output: Option<PathBuf>,
61
62 #[arg(
63 long,
64 help = "If given, will be used to store and retrieve past metrics for comparison purposes"
65 )]
66 archive_dir: Option<PathBuf>,
67
68 #[clap(subcommand)]
69 command: Command,
70}
71
72#[derive(Debug, Clone, Copy, ValueEnum)]
73enum LnInvoiceGeneration {
74 LdkLightningCli,
75}
76
77#[derive(Subcommand, Clone)]
78enum Command {
79 #[command(about = "Keep many websocket connections to a federation for a duration of time")]
80 TestConnect {
81 #[arg(long, help = "Federation invite code")]
82 invite_code: String,
83 #[arg(
84 long,
85 default_value = "60",
86 help = "How much time to keep the connections open, in seconds"
87 )]
88 duration_secs: u64,
89 #[arg(
90 long,
91 default_value = "120",
92 help = "Timeout for connection attempt and for each request, in secnods"
93 )]
94 timeout_secs: u64,
95 #[arg(
96 long,
97 help = "If given, will limit the number of endpoints (guardians) to connect to"
98 )]
99 limit_endpoints: Option<usize>,
100 },
101 #[command(about = "Try to download the client config many times.")]
102 TestDownload {
103 #[arg(long, help = "Federation invite code")]
104 invite_code: String,
105 },
106 #[command(
107 about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway"
108 )]
109 LoadTest(LoadTestArgs),
110 #[command()]
115 LnCircularLoadTest(LnCircularLoadTestArgs),
116}
117
118#[derive(Args, Clone)]
119struct LoadTestArgs {
120 #[arg(
121 long,
122 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
123 )]
124 invite_code: Option<InviteCode>,
125
126 #[arg(
127 long,
128 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
129 )]
130 initial_notes: Option<OOBNotes>,
131
132 #[arg(
133 long,
134 help = "Gateway Id. If none, retrieve one according to --generate-invoice-with"
135 )]
136 gateway_id: Option<String>,
137
138 #[arg(
139 long,
140 help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)"
141 )]
142 generate_invoice_with: Option<LnInvoiceGeneration>,
143
144 #[arg(
145 long,
146 default_value = "1",
147 help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided"
148 )]
149 invoices_per_user: u16,
150
151 #[arg(
152 long,
153 default_value = "0",
154 help = "How many seconds to sleep between LN payments"
155 )]
156 ln_payment_sleep_secs: u64,
157
158 #[arg(
159 long,
160 help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid"
161 )]
162 invoices_file: Option<PathBuf>,
163
164 #[arg(
165 long,
166 help = "How many notes to distribute to each user",
167 default_value = "2"
168 )]
169 notes_per_user: u16,
170
171 #[arg(
172 long,
173 help = "Note denomination to use for the test",
174 default_value = "1024"
175 )]
176 note_denomination: Amount,
177
178 #[arg(
179 long,
180 help = "Invoice amount when generating one",
181 default_value = "1000"
182 )]
183 invoice_amount: Amount,
184}
185
186#[derive(Args, Clone)]
187struct LnCircularLoadTestArgs {
188 #[arg(
189 long,
190 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
191 )]
192 invite_code: Option<InviteCode>,
193
194 #[arg(
195 long,
196 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
197 )]
198 initial_notes: Option<OOBNotes>,
199
200 #[arg(
201 long,
202 default_value = "60",
203 help = "For how many seconds to run the test"
204 )]
205 test_duration_secs: u64,
206
207 #[arg(
208 long,
209 default_value = "0",
210 help = "How many seconds to sleep between LN payments"
211 )]
212 ln_payment_sleep_secs: u64,
213
214 #[arg(
215 long,
216 help = "How many notes to distribute to each user",
217 default_value = "1"
218 )]
219 notes_per_user: u16,
220
221 #[arg(
222 long,
223 help = "Note denomination to use for the test",
224 default_value = "1024"
225 )]
226 note_denomination: Amount,
227
228 #[arg(
229 long,
230 help = "Invoice amount when generating one",
231 default_value = "1000"
232 )]
233 invoice_amount: Amount,
234
235 #[arg(long)]
236 strategy: LnCircularStrategy,
237}
238
239#[derive(Debug, Clone, Copy, ValueEnum)]
240enum LnCircularStrategy {
241 SelfPayment,
243 TwoGateways,
246 PartnerPingPong,
248}
249
250#[derive(Debug, Clone)]
251pub struct MetricEvent {
252 name: String,
253 duration: Duration,
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize)]
257struct EventMetricSummary {
258 name: String,
259 users: u64,
260 n: u64,
261 avg_ms: u128,
262 median_ms: u128,
263 max_ms: u128,
264 min_ms: u128,
265 timestamp_seconds: u64,
266}
267
268#[derive(Debug, Serialize, Deserialize)]
269struct EventMetricComparison {
270 avg_ms_gain: f64,
271 median_ms_gain: f64,
272 max_ms_gain: f64,
273 min_ms_gain: f64,
274 current: EventMetricSummary,
275 previous: EventMetricSummary,
276}
277
278impl std::fmt::Display for EventMetricComparison {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 fn to_percent(gain: f64) -> String {
281 if gain >= 1.0 {
282 format!("+{:.2}%", (gain - 1.0) * 100.0)
283 } else {
284 format!("-{:.2}%", (1.0 - gain) * 100.0)
285 }
286 }
287 f.write_str(&format!(
288 "avg: {}, median: {}, max: {}, min: {}",
289 to_percent(self.avg_ms_gain),
290 to_percent(self.median_ms_gain),
291 to_percent(self.max_ms_gain),
292 to_percent(self.min_ms_gain),
293 ))
294 }
295}
296
297#[tokio::main]
298async fn main() -> anyhow::Result<()> {
299 fedimint_logging::TracingSetup::default().init()?;
300 let opts = Opts::parse();
301 let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
302 let summary_handle = spawn("handle metrics summary", {
303 let opts = opts.clone();
304 async { handle_metrics_summary(opts, event_receiver).await }
305 });
306 let futures = match opts.command.clone() {
307 Command::TestConnect {
308 invite_code,
309 duration_secs,
310 timeout_secs,
311 limit_endpoints,
312 } => {
313 let connectors = ConnectorRegistry::build_from_client_env()?.bind().await?;
314 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
315 test_connect_raw_client(
316 &connectors,
317 invite_code,
318 opts.users,
319 Duration::from_secs(duration_secs),
320 Duration::from_secs(timeout_secs),
321 limit_endpoints,
322 event_sender.clone(),
323 )
324 .await?
325 }
326 Command::TestDownload { invite_code } => {
327 let connectors = ConnectorRegistry::build_from_client_env()?.bind().await?;
328 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
329 test_download_config(&connectors, &invite_code, opts.users, &event_sender.clone())
330 }
331 Command::LoadTest(args) => {
332 let invite_code = invite_code_or_fallback(args.invite_code).await;
333
334 let gateway_id = if let Some(gateway_id) = args.gateway_id {
335 Some(gateway_id)
336 } else if let Some(generate_invoice_with) = args.generate_invoice_with {
337 Some(get_gateway_id(generate_invoice_with).await?)
338 } else {
339 None
340 };
341 let invoices = if let Some(invoices_file) = args.invoices_file {
342 let display_path = invoices_file.display();
343 let invoices_file = tokio::fs::File::open(&invoices_file)
344 .await
345 .with_context(|| format!("Failed to open {display_path}"))?;
346 let mut lines = tokio::io::BufReader::new(invoices_file).lines();
347 let mut invoices = vec![];
348 while let Some(line) = lines.next_line().await? {
349 let invoice = Bolt11Invoice::from_str(&line)?;
350 invoices.push(invoice);
351 }
352 invoices
353 } else {
354 vec![]
355 };
356 if args.generate_invoice_with.is_none() && invoices.is_empty() {
357 info!(
358 "No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run"
359 );
360 }
361 run_load_test(
362 opts.archive_dir,
363 opts.users,
364 invite_code,
365 args.initial_notes,
366 args.generate_invoice_with,
367 args.invoices_per_user,
368 Duration::from_secs(args.ln_payment_sleep_secs),
369 invoices,
370 gateway_id,
371 args.notes_per_user,
372 args.note_denomination,
373 args.invoice_amount,
374 event_sender.clone(),
375 )
376 .await?
377 }
378 Command::LnCircularLoadTest(args) => {
379 let invite_code = invite_code_or_fallback(args.invite_code).await;
380 run_ln_circular_load_test(
381 opts.archive_dir,
382 opts.users,
383 invite_code,
384 args.initial_notes,
385 Duration::from_secs(args.test_duration_secs),
386 Duration::from_secs(args.ln_payment_sleep_secs),
387 args.notes_per_user,
388 args.note_denomination,
389 args.invoice_amount,
390 args.strategy,
391 event_sender.clone(),
392 )
393 .await?
394 }
395 };
396
397 let result = futures::future::join_all(futures).await;
398 drop(event_sender);
399 summary_handle.await??;
400 let len_failures = result.iter().filter(|r| r.is_err()).count();
401 eprintln!("{} results, {len_failures} failures", result.len());
402 for r in result {
403 if let Err(e) = r {
404 warn!("Task failed: {:?}", e);
405 }
406 }
407 if len_failures > 0 {
408 bail!("Finished with failures");
409 }
410 info!("Finished successfully");
411 Ok(())
412}
413
414async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> {
415 if let Some(invite_code) = invite_code {
416 Some(invite_code)
417 } else {
418 match get_invite_code_cli(0.into()).await {
420 Ok(invite_code) => Some(invite_code),
421 Err(e) => {
422 info!(
423 "No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."
424 );
425 None
426 }
427 }
428 }
429}
430
431#[allow(clippy::too_many_arguments)]
432async fn run_load_test(
433 archive_dir: Option<PathBuf>,
434 users: u16,
435 invite_code: Option<InviteCode>,
436 initial_notes: Option<OOBNotes>,
437 generate_invoice_with: Option<LnInvoiceGeneration>,
438 generated_invoices_per_user: u16,
439 ln_payment_sleep: Duration,
440 invoices_from_file: Vec<Bolt11Invoice>,
441 gateway_id: Option<String>,
442 notes_per_user: u16,
443 note_denomination: Amount,
444 invoice_amount: Amount,
445 event_sender: mpsc::UnboundedSender<MetricEvent>,
446) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
447 let db_path = get_db_path(&archive_dir);
448 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
449 let minimum_notes = notes_per_user * users;
450 let minimum_amount_required = note_denomination * u64::from(minimum_notes);
451
452 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
453 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
454 print_coordinator_notes(&coordinator).await?;
455 info!(
456 "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
457 );
458 remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
459 print_coordinator_notes(&coordinator).await?;
460
461 let users_clients = get_users_clients(users, db_path, invite_code).await?;
462
463 let mut users_notes =
464 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
465 let mut users_invoices = HashMap::new();
466 let mut user = 0;
467 for invoice in invoices_from_file {
469 users_invoices
470 .entry(user)
471 .or_insert_with(Vec::new)
472 .push(invoice);
473 user = (user + 1) % users;
474 }
475
476 info!("Starting user tasks");
477 let futures = users_clients
478 .into_iter()
479 .enumerate()
480 .map(|(u, client)| {
481 let u = u as u16;
482 let oob_notes = users_notes.remove(&u).unwrap();
483 let invoices = users_invoices.remove(&u).unwrap_or_default();
484 let event_sender = event_sender.clone();
485 let f: BoxFuture<_> = Box::pin(do_load_test_user_task(
486 format!("User {u}:"),
487 client,
488 oob_notes,
489 generated_invoices_per_user,
490 ln_payment_sleep,
491 invoice_amount,
492 invoices,
493 generate_invoice_with,
494 event_sender,
495 gateway_id.clone(),
496 ));
497 f
498 })
499 .collect::<Vec<_>>();
500
501 Ok(futures)
502}
503
504async fn get_notes_for_users(
505 users: u16,
506 notes_per_user: u16,
507 coordinator: ClientHandleArc,
508 note_denomination: Amount,
509) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> {
510 let mut users_notes = HashMap::new();
511 for u in 0..users {
512 users_notes.insert(u, Vec::with_capacity(notes_per_user.into()));
513 for _ in 0..notes_per_user {
514 let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?;
515 let user_amount = oob_notes.total_amount();
516 info!("Giving {user_amount} to user {u}");
517 users_notes.get_mut(&u).unwrap().push(oob_notes);
518 }
519 }
520 Ok(users_notes)
521}
522
523async fn get_users_clients(
524 n: u16,
525 db_path: Option<PathBuf>,
526 invite_code: Option<InviteCode>,
527) -> anyhow::Result<Vec<ClientHandleArc>> {
528 let mut users_clients = Vec::with_capacity(n.into());
529 for u in 0..n {
530 let (client, _) = get_user_client(u, &db_path, &invite_code).await?;
531 users_clients.push(client);
532 }
533 Ok(users_clients)
534}
535
536async fn get_user_client(
537 user_index: u16,
538 db_path: &Option<PathBuf>,
539 invite_code: &Option<InviteCode>,
540) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
541 let user_db = db_path
542 .as_ref()
543 .map(|db_path| db_path.join(format!("user_{user_index}.db")));
544 let user_invite_code = if user_db.as_ref().is_some_and(|db| db.exists()) {
545 None
546 } else {
547 invite_code.clone()
548 };
549 let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?;
550 if let Ok(ln_client) = client.get_first_module::<LightningClientModule>() {
552 let _ = ln_client.update_gateway_cache().await;
553 }
554 Ok((client, invite_code))
555}
556
557async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> {
558 info!("Note summary:");
559 let summary = get_note_summary(coordinator).await?;
560 for (k, v) in summary.iter() {
561 info!("{k}: {v}");
562 }
563 Ok(())
564}
565
566async fn get_required_notes(
567 coordinator: &ClientHandleArc,
568 minimum_amount_required: Amount,
569 event_sender: &mpsc::UnboundedSender<MetricEvent>,
570) -> anyhow::Result<()> {
571 let current_balance = coordinator.get_balance_for_btc().await?;
572
573 if current_balance < minimum_amount_required {
574 let diff = minimum_amount_required.saturating_sub(current_balance);
575 info!(
576 "Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"
577 );
578 match try_get_notes_cli(&diff, 5).await {
579 Ok(notes) => {
580 info!("Got {} more notes, reissuing them", notes.total_amount());
581 reissue_notes(coordinator, notes, event_sender).await?;
582 }
583 Err(e) => {
584 info!("Unable to get more notes: '{e}', will try to proceed without them");
585 }
586 }
587 } else {
588 info!(
589 "Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"
590 );
591 }
592 Ok(())
593}
594
595async fn reissue_initial_notes(
596 initial_notes: Option<OOBNotes>,
597 coordinator: &ClientHandleArc,
598 event_sender: &mpsc::UnboundedSender<MetricEvent>,
599) -> anyhow::Result<()> {
600 if let Some(notes) = initial_notes {
601 let amount = notes.total_amount();
602 info!("Reissuing initial notes, got {amount}");
603 reissue_notes(coordinator, notes, event_sender).await?;
604 }
605 Ok(())
606}
607
608async fn get_coordinator_client(
609 db_path: &Option<PathBuf>,
610 invite_code: &Option<InviteCode>,
611) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
612 let (client, invite_code) = if let Some(db_path) = db_path {
613 let coordinator_db = db_path.join("coordinator.db");
614 if coordinator_db.exists() {
615 build_client(invite_code.clone(), Some(&coordinator_db)).await?
616 } else {
617 tokio::fs::create_dir_all(db_path).await?;
618 build_client(
619 Some(invite_code.clone().context(
620 "Running on this archive dir for the first time, an invite code is required",
621 )?),
622 Some(&coordinator_db),
623 )
624 .await?
625 }
626 } else {
627 build_client(
628 Some(
629 invite_code
630 .clone()
631 .context("No archive dir given, an invite code is strictly required")?,
632 ),
633 None,
634 )
635 .await?
636 };
637 Ok((client, invite_code))
638}
639
640fn get_db_path(archive_dir: &Option<PathBuf>) -> Option<PathBuf> {
641 archive_dir.as_ref().map(|p| p.join("db"))
642}
643
644async fn get_lightning_gateway(
645 client: &ClientHandleArc,
646 gateway_id: Option<String>,
647) -> Option<LightningGateway> {
648 let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id");
649 let ln_module = client
650 .get_first_module::<LightningClientModule>()
651 .expect("Must have ln client module");
652 ln_module.select_gateway(&gateway_id).await
653}
654
655#[allow(clippy::too_many_arguments)]
656async fn do_load_test_user_task(
657 prefix: String,
658 client: ClientHandleArc,
659 oob_notes: Vec<OOBNotes>,
660 generated_invoices_per_user: u16,
661 ln_payment_sleep: Duration,
662 invoice_amount: Amount,
663 additional_invoices: Vec<Bolt11Invoice>,
664 generate_invoice_with: Option<LnInvoiceGeneration>,
665 event_sender: mpsc::UnboundedSender<MetricEvent>,
666 gateway_id: Option<String>,
667) -> anyhow::Result<()> {
668 let ln_gateway = get_lightning_gateway(&client, gateway_id).await;
669 for oob_note in oob_notes {
670 let amount = oob_note.total_amount();
671 reissue_notes(&client, oob_note, &event_sender)
672 .await
673 .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
674 }
675 let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable();
676 while let Some(_) = generated_invoices_per_user_iterator.next() {
677 let total_amount = get_note_summary(&client).await?.total_amount();
678 if invoice_amount > total_amount {
679 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
680 } else {
681 match generate_invoice_with {
682 Some(LnInvoiceGeneration::LdkLightningCli) => {
683 let invoice = ldk_create_invoice(invoice_amount).await?;
684 gateway_pay_invoice(
685 &prefix,
686 "LND",
687 &client,
688 invoice.clone(),
689 &event_sender,
690 ln_gateway.clone(),
691 )
692 .await?;
693 ldk_wait_invoice_payment(&invoice).await?;
694 }
695 None if additional_invoices.is_empty() => {
696 debug!(
697 "No method given to generate an invoice and no invoices on file, will not test the gateway"
698 );
699 break;
700 }
701 None => {
702 break;
703 }
704 }
705 if generated_invoices_per_user_iterator.peek().is_some() {
706 fedimint_core::task::sleep(ln_payment_sleep).await;
708 }
709 }
710 }
711 let mut additional_invoices = additional_invoices.into_iter().peekable();
712 while let Some(invoice) = additional_invoices.next() {
713 let total_amount = get_note_summary(&client).await?.total_amount();
714 let invoice_amount =
715 Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default());
716 if invoice_amount > total_amount {
717 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
718 } else if invoice_amount == Amount::ZERO {
719 warn!("Can't pay invoice {invoice}, amount is zero");
720 } else {
721 gateway_pay_invoice(
722 &prefix,
723 "unknown",
724 &client,
725 invoice,
726 &event_sender,
727 ln_gateway.clone(),
728 )
729 .await?;
730 if additional_invoices.peek().is_some() {
731 fedimint_core::task::sleep(ln_payment_sleep).await;
733 }
734 }
735 }
736 Ok(())
737}
738
739#[allow(clippy::too_many_arguments)]
740async fn run_ln_circular_load_test(
741 archive_dir: Option<PathBuf>,
742 users: u16,
743 invite_code: Option<InviteCode>,
744 initial_notes: Option<OOBNotes>,
745 test_duration: Duration,
746 ln_payment_sleep: Duration,
747 notes_per_user: u16,
748 note_denomination: Amount,
749 invoice_amount: Amount,
750 strategy: LnCircularStrategy,
751 event_sender: mpsc::UnboundedSender<MetricEvent>,
752) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
753 let db_path = get_db_path(&archive_dir);
754 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
755 let minimum_notes = notes_per_user * users;
756 let minimum_amount_required = note_denomination * u64::from(minimum_notes);
757
758 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
759 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
760
761 info!(
762 "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
763 );
764 remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
765
766 print_coordinator_notes(&coordinator).await?;
767
768 let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?;
769
770 let mut users_notes =
771 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
772
773 info!("Starting user tasks");
774 let futures = users_clients
775 .into_iter()
776 .enumerate()
777 .map(|(u, client)| {
778 let u = u as u16;
779 let oob_notes = users_notes.remove(&u).unwrap();
780 let event_sender = event_sender.clone();
781 let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task(
782 format!("User {u}:"),
783 client,
784 invite_code.clone(),
785 oob_notes,
786 test_duration,
787 ln_payment_sleep,
788 invoice_amount,
789 strategy,
790 event_sender,
791 ));
792 f
793 })
794 .collect::<Vec<_>>();
795
796 Ok(futures)
797}
798
799#[allow(clippy::too_many_arguments)]
800async fn do_ln_circular_test_user_task(
801 prefix: String,
802 client: ClientHandleArc,
803 invite_code: Option<InviteCode>,
804 oob_notes: Vec<OOBNotes>,
805 test_duration: Duration,
806 ln_payment_sleep: Duration,
807 invoice_amount: Amount,
808 strategy: LnCircularStrategy,
809 event_sender: mpsc::UnboundedSender<MetricEvent>,
810) -> anyhow::Result<()> {
811 for oob_note in oob_notes {
812 let amount = oob_note.total_amount();
813 reissue_notes(&client, oob_note, &event_sender)
814 .await
815 .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
816 }
817 let initial_time = fedimint_core::time::now();
818 let still_ontime = || async {
819 fedimint_core::time::now()
820 .duration_since(initial_time)
821 .expect("time to work")
822 <= test_duration
823 };
824 let sleep_a_bit = || async {
825 if still_ontime().await {
826 fedimint_core::task::sleep(ln_payment_sleep).await;
827 }
828 };
829 match strategy {
830 LnCircularStrategy::TwoGateways => {
831 let invoice_generation = LnInvoiceGeneration::LdkLightningCli;
832 while still_ontime().await {
833 let gateway_id = get_gateway_id(invoice_generation).await?;
834 let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await;
835 run_two_gateways_strategy(
836 &prefix,
837 &invoice_generation,
838 &invoice_amount,
839 &event_sender,
840 &client,
841 ln_gateway,
842 )
843 .await?;
844 sleep_a_bit().await;
845 }
846 }
847 LnCircularStrategy::SelfPayment => {
848 while still_ontime().await {
849 do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?;
850 sleep_a_bit().await;
851 }
852 }
853 LnCircularStrategy::PartnerPingPong => {
854 let (partner, _) = build_client(invite_code, None).await?;
855 while still_ontime().await {
856 do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender)
857 .await?;
858 sleep_a_bit().await;
859 }
860 }
861 }
862 Ok(())
863}
864
865const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice";
866
867async fn run_two_gateways_strategy(
868 prefix: &str,
869 invoice_generation: &LnInvoiceGeneration,
870 invoice_amount: &Amount,
871 event_sender: &mpsc::UnboundedSender<MetricEvent>,
872 client: &ClientHandleArc,
873 ln_gateway: Option<LightningGateway>,
874) -> Result<(), anyhow::Error> {
875 let create_invoice_time = fedimint_core::time::now();
876 match *invoice_generation {
877 LnInvoiceGeneration::LdkLightningCli => {
878 let invoice = ldk_create_invoice(*invoice_amount).await?;
879 let elapsed = create_invoice_time.elapsed()?;
880 info!("Created invoice using LDK in {elapsed:?}");
881 event_sender.send(MetricEvent {
882 name: GATEWAY_CREATE_INVOICE.into(),
883 duration: elapsed,
884 })?;
885 gateway_pay_invoice(
886 prefix,
887 "LND",
888 client,
889 invoice.clone(),
890 event_sender,
891 ln_gateway.clone(),
892 )
893 .await?;
894 ldk_wait_invoice_payment(&invoice).await?;
895 let (operation_id, invoice) =
896 client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?;
897 let pay_invoice_time = fedimint_core::time::now();
898 ldk_pay_invoice(invoice).await?;
899 wait_invoice_payment(
900 prefix,
901 "LND",
902 client,
903 operation_id,
904 event_sender,
905 pay_invoice_time,
906 )
907 .await?;
908 }
909 }
910 Ok(())
911}
912
913async fn do_self_payment(
914 prefix: &str,
915 client: &ClientHandleArc,
916 invoice_amount: Amount,
917 event_sender: &mpsc::UnboundedSender<MetricEvent>,
918) -> anyhow::Result<()> {
919 let (operation_id, invoice) =
920 client_create_invoice(client, invoice_amount, event_sender, None).await?;
921 let pay_invoice_time = fedimint_core::time::now();
922 let lightning_module = client.get_first_module::<LightningClientModule>()?;
923 lightning_module
925 .pay_bolt11_invoice(None, invoice, ())
926 .await?;
927 wait_invoice_payment(
928 prefix,
929 "gateway",
930 client,
931 operation_id,
932 event_sender,
933 pay_invoice_time,
934 )
935 .await?;
936 Ok(())
937}
938
939async fn do_partner_ping_pong(
940 prefix: &str,
941 client: &ClientHandleArc,
942 partner: &ClientHandleArc,
943 invoice_amount: Amount,
944 event_sender: &mpsc::UnboundedSender<MetricEvent>,
945) -> anyhow::Result<()> {
946 let (operation_id, invoice) =
948 client_create_invoice(partner, invoice_amount, event_sender, None).await?;
949 let pay_invoice_time = fedimint_core::time::now();
950 let lightning_module = client.get_first_module::<LightningClientModule>()?;
951 lightning_module
954 .pay_bolt11_invoice(None, invoice, ())
955 .await?;
956 wait_invoice_payment(
957 prefix,
958 "gateway",
959 partner,
960 operation_id,
961 event_sender,
962 pay_invoice_time,
963 )
964 .await?;
965 let (operation_id, invoice) =
967 client_create_invoice(client, invoice_amount, event_sender, None).await?;
968 let pay_invoice_time = fedimint_core::time::now();
969 let partner_lightning_module = partner.get_first_module::<LightningClientModule>()?;
970 partner_lightning_module
973 .pay_bolt11_invoice(None, invoice, ())
974 .await?;
975 wait_invoice_payment(
976 prefix,
977 "gateway",
978 client,
979 operation_id,
980 event_sender,
981 pay_invoice_time,
982 )
983 .await?;
984 Ok(())
985}
986
987async fn wait_invoice_payment(
988 prefix: &str,
989 gateway_name: &str,
990 client: &ClientHandleArc,
991 operation_id: fedimint_core::core::OperationId,
992 event_sender: &mpsc::UnboundedSender<MetricEvent>,
993 pay_invoice_time: std::time::SystemTime,
994) -> anyhow::Result<()> {
995 let elapsed = pay_invoice_time.elapsed()?;
996 info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}");
997 event_sender.send(MetricEvent {
998 name: format!("gateway_{gateway_name}_payment_received_started"),
999 duration: elapsed,
1000 })?;
1001 let lightning_module = client.get_first_module::<LightningClientModule>()?;
1002 let mut updates = lightning_module
1003 .subscribe_ln_receive(operation_id)
1004 .await?
1005 .into_stream();
1006 while let Some(update) = updates.next().await {
1007 debug!(%prefix, ?update, "Invoice payment update");
1008 match update {
1009 LnReceiveState::Claimed => {
1010 let elapsed: Duration = pay_invoice_time.elapsed()?;
1011 info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}");
1012 event_sender.send(MetricEvent {
1013 name: "gateway_payment_received_success".into(),
1014 duration: elapsed,
1015 })?;
1016 event_sender.send(MetricEvent {
1017 name: format!("gateway_{gateway_name}_payment_received_success"),
1018 duration: elapsed,
1019 })?;
1020 break;
1021 }
1022 LnReceiveState::Canceled { reason } => {
1023 let elapsed: Duration = pay_invoice_time.elapsed()?;
1024 info!(
1025 "{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"
1026 );
1027 event_sender.send(MetricEvent {
1028 name: "gateway_payment_received_canceled".into(),
1029 duration: elapsed,
1030 })?;
1031 break;
1032 }
1033 _ => {}
1034 }
1035 }
1036 Ok(())
1037}
1038
1039async fn client_create_invoice(
1040 client: &ClientHandleArc,
1041 invoice_amount: Amount,
1042 event_sender: &mpsc::UnboundedSender<MetricEvent>,
1043 ln_gateway: Option<LightningGateway>,
1044) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> {
1045 let create_invoice_time = fedimint_core::time::now();
1046 let lightning_module = client.get_first_module::<LightningClientModule>()?;
1047 let desc = Description::new("test".to_string())?;
1048 let (operation_id, invoice, _) = lightning_module
1049 .create_bolt11_invoice(
1050 invoice_amount,
1051 Bolt11InvoiceDescription::Direct(desc),
1052 None,
1053 (),
1054 ln_gateway,
1055 )
1056 .await?;
1057 let elapsed = create_invoice_time.elapsed()?;
1058 info!("Created invoice using gateway in {elapsed:?}");
1059 event_sender.send(MetricEvent {
1060 name: GATEWAY_CREATE_INVOICE.into(),
1061 duration: elapsed,
1062 })?;
1063 Ok((operation_id, invoice))
1064}
1065
1066fn test_download_config(
1067 endpoints: &ConnectorRegistry,
1068 invite_code: &InviteCode,
1069 users: u16,
1070 event_sender: &mpsc::UnboundedSender<MetricEvent>,
1071) -> Vec<BoxFuture<'static, anyhow::Result<()>>> {
1072 (0..users)
1073 .map(move |_| {
1074 let invite_code = invite_code.clone();
1075 let event_sender = event_sender.clone();
1076 let endpoints = endpoints.clone();
1077 let f: BoxFuture<_> = Box::pin(async move {
1078 let m = fedimint_core::time::now();
1079 let _ = download_from_invite_code(&endpoints, &invite_code).await?;
1080 event_sender.send(MetricEvent {
1081 name: "download_client_config".into(),
1082 duration: m.elapsed()?,
1083 })?;
1084 Ok(())
1085 });
1086 f
1087 })
1088 .collect()
1089}
1090
1091async fn test_connect_raw_client(
1092 endpoints: &ConnectorRegistry,
1093 invite_code: InviteCode,
1094 users: u16,
1095 duration: Duration,
1096 timeout: Duration,
1097 limit_endpoints: Option<usize>,
1098 event_sender: mpsc::UnboundedSender<MetricEvent>,
1099) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
1100 use jsonrpsee_core::client::ClientT;
1101 use jsonrpsee_ws_client::WsClientBuilder;
1102
1103 let (mut cfg, _) = download_from_invite_code(endpoints, &invite_code).await?;
1104
1105 if let Some(limit_endpoints) = limit_endpoints {
1106 cfg.global.api_endpoints = cfg
1107 .global
1108 .api_endpoints
1109 .into_iter()
1110 .take(limit_endpoints)
1111 .collect();
1112 info!("Limiting endpoints to {:?}", cfg.global.api_endpoints);
1113 }
1114
1115 info!("Connecting to {users} clients");
1116 let clients = (0..users)
1117 .flat_map(|_| {
1118 cfg.global.api_endpoints.values().map(|url| async {
1119 let ws_client = WsClientBuilder::default()
1120 .request_timeout(timeout)
1121 .connection_timeout(timeout)
1122 .build(url_to_string_with_default_port(&url.url))
1123 .await?;
1124 Ok::<_, anyhow::Error>(ws_client)
1125 })
1126 })
1127 .collect::<Vec<_>>();
1128 let clients = futures::future::try_join_all(clients).await?;
1129 info!("Keeping {users} clients connected for {duration:?}");
1130 Ok(clients
1131 .into_iter()
1132 .map(|client| {
1133 let event_sender = event_sender.clone();
1134 let f: BoxFuture<_> = Box::pin(async move {
1135 let initial_time = fedimint_core::time::now();
1136 while initial_time.elapsed()? < duration {
1137 let m = fedimint_core::time::now();
1138 let _epoch: u64 = client
1139 .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()])
1140 .await?;
1141 event_sender.send(MetricEvent {
1142 name: SESSION_COUNT_ENDPOINT.into(),
1143 duration: m.elapsed()?,
1144 })?;
1145 fedimint_core::task::sleep(Duration::from_secs(1)).await;
1146 }
1147 Ok(())
1148 });
1149 f
1150 })
1151 .collect())
1152}
1153
1154fn url_to_string_with_default_port(url: &SafeUrl) -> String {
1155 format!(
1156 "{}://{}:{}{}",
1157 url.scheme(),
1158 url.host().expect("Asserted on construction"),
1159 url.port_or_known_default()
1160 .expect("Asserted on construction"),
1161 url.path()
1162 )
1163}
1164
1165async fn handle_metrics_summary(
1166 opts: Opts,
1167 mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>,
1168) -> anyhow::Result<()> {
1169 let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs();
1170 let mut metrics_json_output_files = vec![];
1171 let mut previous_metrics = vec![];
1172 let mut comparison_output = None;
1173 if let Some(archive_dir) = opts.archive_dir {
1174 let mut archive_metrics = archive_dir.join("metrics");
1175 archive_metrics.push(opts.users.to_string());
1176 tokio::fs::create_dir_all(&archive_metrics).await?;
1177 let mut archive_comparisons = archive_dir.join("comparisons");
1178 archive_comparisons.push(opts.users.to_string());
1179 tokio::fs::create_dir_all(&archive_comparisons).await?;
1180
1181 let latest_metrics_file = std::fs::read_dir(&archive_metrics)?
1182 .map(|entry| {
1183 let entry = entry.unwrap();
1184 let metadata = entry.metadata().unwrap();
1185 let created = metadata
1186 .created()
1187 .unwrap_or_else(|_| metadata.modified().unwrap());
1188 (entry, created)
1189 })
1190 .max_by_key(|(_entry, created)| created.to_owned())
1191 .map(|(entry, _)| entry.path());
1192 if let Some(latest_metrics_file) = latest_metrics_file {
1193 let display_path = latest_metrics_file.display();
1194 let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file)
1195 .await
1196 .with_context(|| format!("Failed to open {display_path}"))?;
1197 let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines();
1198 while let Some(line) = lines.next_line().await? {
1199 match serde_json::from_str::<EventMetricSummary>(&line) {
1200 Ok(metric) => {
1201 previous_metrics.push(metric);
1202 }
1203 Err(e) => {
1204 warn!("Failed to parse previous metric: {e:?}");
1205 }
1206 }
1207 }
1208 }
1209 let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",));
1210 let new_metric_output = BufWriter::new(
1211 OpenOptions::new()
1212 .write(true)
1213 .create(true)
1214 .truncate(true)
1215 .open(new_metric_output)
1216 .await?,
1217 );
1218 metrics_json_output_files.push(new_metric_output);
1219 if !previous_metrics.is_empty() {
1220 let new_comparison_output =
1221 archive_comparisons.join(format!("{timestamp_seconds}.json",));
1222 comparison_output = Some(BufWriter::new(
1223 OpenOptions::new()
1224 .write(true)
1225 .create(true)
1226 .truncate(true)
1227 .open(new_comparison_output)
1228 .await?,
1229 ));
1230 }
1231 }
1232 if let Some(metrics_json_output) = opts.metrics_json_output {
1233 metrics_json_output_files.push(BufWriter::new(
1234 tokio::fs::OpenOptions::new()
1235 .write(true)
1236 .create(true)
1237 .truncate(true)
1238 .open(metrics_json_output)
1239 .await?,
1240 ));
1241 }
1242 let mut results = BTreeMap::new();
1243 while let Some(event) = event_receiver.recv().await {
1244 let entry = results.entry(event.name).or_insert_with(Vec::new);
1245 entry.push(event.duration);
1246 }
1247 let mut previous_metrics = previous_metrics
1248 .into_iter()
1249 .map(|metric| (metric.name.clone(), metric))
1250 .collect::<HashMap<_, _>>();
1251 for (k, mut v) in results {
1252 v.sort();
1253 let n = v.len();
1254 let max = v.iter().last().unwrap();
1255 let min = v.first().unwrap();
1256 let median = v[n / 2];
1257 let sum: Duration = v.iter().sum();
1258 let avg = sum / n as u32;
1259 let metric_summary = EventMetricSummary {
1260 name: k.clone(),
1261 users: u64::from(opts.users),
1262 n: n as u64,
1263 avg_ms: avg.as_millis(),
1264 median_ms: median.as_millis(),
1265 max_ms: max.as_millis(),
1266 min_ms: min.as_millis(),
1267 timestamp_seconds,
1268 };
1269 let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) {
1270 if previous_metric.n == metric_summary.n {
1271 fn calculate_gain(current: u128, previous: u128) -> f64 {
1272 current as f64 / previous as f64
1273 }
1274 let comparison = EventMetricComparison {
1275 avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms),
1276 median_ms_gain: calculate_gain(
1277 metric_summary.median_ms,
1278 previous_metric.median_ms,
1279 ),
1280 max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms),
1281 min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms),
1282 current: metric_summary.clone(),
1283 previous: previous_metric,
1284 };
1285 if let Some(comparison_output) = &mut comparison_output {
1286 let comparison_json =
1287 serde_json::to_string(&comparison).expect("to be serializable");
1288 comparison_output
1289 .write_all(format!("{comparison_json}\n").as_bytes())
1290 .await
1291 .expect("to write on file");
1292 }
1293 Some(comparison)
1294 } else {
1295 info!(
1296 "Skipping comparison for {k} because previous metric has different n ({} vs {})",
1297 previous_metric.n, metric_summary.n
1298 );
1299 None
1300 }
1301 } else {
1302 None
1303 };
1304 if let Some(comparison) = comparison {
1305 println!(
1306 "{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"
1307 );
1308 } else {
1309 println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}");
1310 }
1311 let metric_summary_json =
1312 serde_json::to_string(&metric_summary).expect("to be serializable");
1313 for metrics_json_output_file in &mut metrics_json_output_files {
1314 metrics_json_output_file
1315 .write_all(format!("{metric_summary_json}\n").as_bytes())
1316 .await
1317 .expect("to write on file");
1318 }
1319 }
1320 for mut output in metrics_json_output_files {
1321 output.flush().await?;
1322 }
1323 if let Some(mut output) = comparison_output {
1324 output.flush().await?;
1325 }
1326 Ok(())
1327}
1328
1329async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> {
1330 let gateway_json = match generate_invoice_with {
1331 LnInvoiceGeneration::LdkLightningCli => {
1332 cmd!(GatewayLndCli, "info").out_json().await
1334 }
1335 }?;
1336
1337 let gatewayd_version = devimint::util::Gatewayd::version_or_default().await;
1338 let gateway_id = if gatewayd_version < *VERSION_0_10_0_ALPHA {
1339 gateway_json["gateway_id"]
1340 .as_str()
1341 .context("gateway_id must be a string")?
1342 .to_owned()
1343 } else {
1344 gateway_json["registrations"]["http"][1]
1345 .as_str()
1346 .context("gateway id must be a string")?
1347 .to_owned()
1348 };
1349
1350 Ok(gateway_id)
1351}