1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_precision_loss)]
4#![allow(clippy::missing_errors_doc)]
5#![allow(clippy::ref_option)]
6#![allow(clippy::too_many_lines)]
7#![allow(clippy::large_futures)]
8
9use std::collections::{BTreeMap, HashMap};
10use std::path::PathBuf;
11use std::str::FromStr;
12use std::time::Duration;
13use std::vec;
14
15use anyhow::{Context, anyhow, bail};
16use clap::{Args, Parser, Subcommand, ValueEnum};
17use common::{
18 gateway_pay_invoice, get_note_summary, ldk_create_invoice, ldk_pay_invoice,
19 ldk_wait_invoice_payment, parse_gateway_id, reissue_notes,
20};
21use devimint::cmd;
22use devimint::util::GatewayLndCli;
23use devimint::version_constants::VERSION_0_10_0_ALPHA;
24use fedimint_client::ClientHandleArc;
25use fedimint_connectors::ConnectorRegistry;
26use fedimint_core::Amount;
27use fedimint_core::endpoint_constants::SESSION_COUNT_ENDPOINT;
28use fedimint_core::invite_code::InviteCode;
29use fedimint_core::module::ApiRequestErased;
30use fedimint_core::runtime::spawn;
31use fedimint_core::util::{BoxFuture, SafeUrl};
32use fedimint_ln_client::{LightningClientModule, LnReceiveState};
33use fedimint_ln_common::LightningGateway;
34use fedimint_mint_client::OOBNotes;
35use futures::StreamExt;
36use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description};
37use serde::{Deserialize, Serialize};
38use tokio::fs::OpenOptions;
39use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufWriter};
40use tokio::sync::mpsc;
41use tracing::{debug, info, warn};
42
43use crate::common::{
44 build_client, do_spend_notes, get_invite_code_cli, remint_denomination, try_get_notes_cli,
45};
46pub mod common;
47
48#[derive(Parser, Clone)]
49#[command(version)]
50struct Opts {
51 #[arg(
52 long,
53 default_value = "10",
54 help = "Number of users. Each user will work in parallel"
55 )]
56 users: u16,
57
58 #[arg(long, help = "Output with the metrics results in JSON format")]
59 metrics_json_output: Option<PathBuf>,
60
61 #[arg(
62 long,
63 help = "If given, will be used to store and retrieve past metrics for comparison purposes"
64 )]
65 archive_dir: Option<PathBuf>,
66
67 #[clap(subcommand)]
68 command: Command,
69}
70
71#[derive(Debug, Clone, Copy, ValueEnum)]
72enum LnInvoiceGeneration {
73 LdkLightningCli,
74}
75
76#[derive(Subcommand, Clone)]
77enum Command {
78 #[command(about = "Keep many websocket connections to a federation for a duration of time")]
79 TestConnect {
80 #[arg(long, help = "Federation invite code")]
81 invite_code: String,
82 #[arg(
83 long,
84 default_value = "60",
85 help = "How much time to keep the connections open, in seconds"
86 )]
87 duration_secs: u64,
88 #[arg(
89 long,
90 default_value = "120",
91 help = "Timeout for connection attempt and for each request, in secnods"
92 )]
93 timeout_secs: u64,
94 #[arg(
95 long,
96 help = "If given, will limit the number of endpoints (guardians) to connect to"
97 )]
98 limit_endpoints: Option<usize>,
99 },
100 #[command(about = "Try to download the client config many times.")]
101 TestDownload {
102 #[arg(long, help = "Federation invite code")]
103 invite_code: String,
104 },
105 #[command(
106 about = "Run a load test where many users in parallel will try to reissue notes and pay invoices through the gateway"
107 )]
108 LoadTest(LoadTestArgs),
109 #[command()]
114 LnCircularLoadTest(LnCircularLoadTestArgs),
115}
116
117#[derive(Args, Clone)]
118struct LoadTestArgs {
119 #[arg(
120 long,
121 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
122 )]
123 invite_code: Option<InviteCode>,
124
125 #[arg(
126 long,
127 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
128 )]
129 initial_notes: Option<OOBNotes>,
130
131 #[arg(
132 long,
133 help = "Gateway Id. If none, retrieve one according to --generate-invoice-with"
134 )]
135 gateway_id: Option<String>,
136
137 #[arg(
138 long,
139 help = "The method used to generate invoices to be paid through the gateway. If none and no --invoices-file provided, no gateway/LN tests will be run. Note that you can't generate an invoice using the same lightning node used by the gateway (i.e self payment is forbidden)"
140 )]
141 generate_invoice_with: Option<LnInvoiceGeneration>,
142
143 #[arg(
144 long,
145 default_value = "1",
146 help = "How many invoices will be created for each user. Only applicable if --generate-invoice-with is provided"
147 )]
148 invoices_per_user: u16,
149
150 #[arg(
151 long,
152 default_value = "0",
153 help = "How many seconds to sleep between LN payments"
154 )]
155 ln_payment_sleep_secs: u64,
156
157 #[arg(
158 long,
159 help = "A text file with one invoice per line. If --generate-invoice-with is provided, these will be additional invoices to be paid"
160 )]
161 invoices_file: Option<PathBuf>,
162
163 #[arg(
164 long,
165 help = "How many notes to distribute to each user",
166 default_value = "2"
167 )]
168 notes_per_user: u16,
169
170 #[arg(
171 long,
172 help = "Note denomination to use for the test",
173 default_value = "1024"
174 )]
175 note_denomination: Amount,
176
177 #[arg(
178 long,
179 help = "Invoice amount when generating one",
180 default_value = "1000"
181 )]
182 invoice_amount: Amount,
183}
184
185#[derive(Args, Clone)]
186struct LnCircularLoadTestArgs {
187 #[arg(
188 long,
189 help = "Federation invite code. If none given, we assume the client already has a config downloaded in DB"
190 )]
191 invite_code: Option<InviteCode>,
192
193 #[arg(
194 long,
195 help = "Notes for the test. If none and no funds on archive, will call fedimint-cli spend"
196 )]
197 initial_notes: Option<OOBNotes>,
198
199 #[arg(
200 long,
201 default_value = "60",
202 help = "For how many seconds to run the test"
203 )]
204 test_duration_secs: u64,
205
206 #[arg(
207 long,
208 default_value = "0",
209 help = "How many seconds to sleep between LN payments"
210 )]
211 ln_payment_sleep_secs: u64,
212
213 #[arg(
214 long,
215 help = "How many notes to distribute to each user",
216 default_value = "1"
217 )]
218 notes_per_user: u16,
219
220 #[arg(
221 long,
222 help = "Note denomination to use for the test",
223 default_value = "1024"
224 )]
225 note_denomination: Amount,
226
227 #[arg(
228 long,
229 help = "Invoice amount when generating one",
230 default_value = "1000"
231 )]
232 invoice_amount: Amount,
233
234 #[arg(long)]
235 strategy: LnCircularStrategy,
236}
237
238#[derive(Debug, Clone, Copy, ValueEnum)]
239enum LnCircularStrategy {
240 SelfPayment,
242 TwoGateways,
245 PartnerPingPong,
247}
248
249#[derive(Debug, Clone)]
250pub struct MetricEvent {
251 name: String,
252 duration: Duration,
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize)]
256struct EventMetricSummary {
257 name: String,
258 users: u64,
259 n: u64,
260 avg_ms: u128,
261 median_ms: u128,
262 max_ms: u128,
263 min_ms: u128,
264 timestamp_seconds: u64,
265}
266
267#[derive(Debug, Serialize, Deserialize)]
268struct EventMetricComparison {
269 avg_ms_gain: f64,
270 median_ms_gain: f64,
271 max_ms_gain: f64,
272 min_ms_gain: f64,
273 current: EventMetricSummary,
274 previous: EventMetricSummary,
275}
276
277impl std::fmt::Display for EventMetricComparison {
278 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
279 fn to_percent(gain: f64) -> String {
280 if gain >= 1.0 {
281 format!("+{:.2}%", (gain - 1.0) * 100.0)
282 } else {
283 format!("-{:.2}%", (1.0 - gain) * 100.0)
284 }
285 }
286 f.write_str(&format!(
287 "avg: {}, median: {}, max: {}, min: {}",
288 to_percent(self.avg_ms_gain),
289 to_percent(self.median_ms_gain),
290 to_percent(self.max_ms_gain),
291 to_percent(self.min_ms_gain),
292 ))
293 }
294}
295
296#[tokio::main]
297async fn main() -> anyhow::Result<()> {
298 fedimint_logging::TracingSetup::default().init()?;
299 let opts = Opts::parse();
300 let (event_sender, event_receiver) = tokio::sync::mpsc::unbounded_channel();
301 let summary_handle = spawn("handle metrics summary", {
302 let opts = opts.clone();
303 async { handle_metrics_summary(opts, event_receiver).await }
304 });
305 let futures = match opts.command.clone() {
306 Command::TestConnect {
307 invite_code,
308 duration_secs,
309 timeout_secs,
310 limit_endpoints,
311 } => {
312 let connectors = ConnectorRegistry::build_from_client_env()?.bind().await?;
313 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
314 test_connect_raw_client(
315 &connectors,
316 invite_code,
317 opts.users,
318 Duration::from_secs(duration_secs),
319 Duration::from_secs(timeout_secs),
320 limit_endpoints,
321 event_sender.clone(),
322 )
323 .await?
324 }
325 Command::TestDownload { invite_code } => {
326 let connectors = ConnectorRegistry::build_from_client_env()?.bind().await?;
327 let invite_code = InviteCode::from_str(&invite_code).context("invalid invite code")?;
328 test_download_config(&connectors, &invite_code, opts.users, &event_sender.clone())
329 }
330 Command::LoadTest(args) => {
331 let invite_code = invite_code_or_fallback(args.invite_code).await;
332
333 let gateway_id = if let Some(gateway_id) = args.gateway_id {
334 Some(gateway_id)
335 } else if let Some(generate_invoice_with) = args.generate_invoice_with {
336 Some(get_gateway_id(generate_invoice_with).await?)
337 } else {
338 None
339 };
340 let invoices = if let Some(invoices_file) = args.invoices_file {
341 let display_path = invoices_file.display();
342 let invoices_file = tokio::fs::File::open(&invoices_file)
343 .await
344 .with_context(|| format!("Failed to open {display_path}"))?;
345 let mut lines = tokio::io::BufReader::new(invoices_file).lines();
346 let mut invoices = vec![];
347 while let Some(line) = lines.next_line().await? {
348 let invoice = Bolt11Invoice::from_str(&line)?;
349 invoices.push(invoice);
350 }
351 invoices
352 } else {
353 vec![]
354 };
355 if args.generate_invoice_with.is_none() && invoices.is_empty() {
356 info!(
357 "No --generate-invoice-with given no invoices on --invoices-file, not LN/gateway tests will be run"
358 );
359 }
360 run_load_test(
361 opts.archive_dir,
362 opts.users,
363 invite_code,
364 args.initial_notes,
365 args.generate_invoice_with,
366 args.invoices_per_user,
367 Duration::from_secs(args.ln_payment_sleep_secs),
368 invoices,
369 gateway_id,
370 args.notes_per_user,
371 args.note_denomination,
372 args.invoice_amount,
373 event_sender.clone(),
374 )
375 .await?
376 }
377 Command::LnCircularLoadTest(args) => {
378 let invite_code = invite_code_or_fallback(args.invite_code).await;
379 run_ln_circular_load_test(
380 opts.archive_dir,
381 opts.users,
382 invite_code,
383 args.initial_notes,
384 Duration::from_secs(args.test_duration_secs),
385 Duration::from_secs(args.ln_payment_sleep_secs),
386 args.notes_per_user,
387 args.note_denomination,
388 args.invoice_amount,
389 args.strategy,
390 event_sender.clone(),
391 )
392 .await?
393 }
394 };
395
396 let result = futures::future::join_all(futures).await;
397 drop(event_sender);
398 summary_handle.await??;
399 let len_failures = result.iter().filter(|r| r.is_err()).count();
400 eprintln!("{} results, {len_failures} failures", result.len());
401 for r in result {
402 if let Err(e) = r {
403 warn!("Task failed: {:?}", e);
404 }
405 }
406 if len_failures > 0 {
407 bail!("Finished with failures");
408 }
409 info!("Finished successfully");
410 Ok(())
411}
412
413async fn invite_code_or_fallback(invite_code: Option<InviteCode>) -> Option<InviteCode> {
414 if let Some(invite_code) = invite_code {
415 Some(invite_code)
416 } else {
417 match get_invite_code_cli(0.into()).await {
419 Ok(invite_code) => Some(invite_code),
420 Err(e) => {
421 info!(
422 "No invite code provided and failed to get one with '{e}' error, will try to proceed without one..."
423 );
424 None
425 }
426 }
427 }
428}
429
430#[allow(clippy::too_many_arguments)]
431async fn run_load_test(
432 archive_dir: Option<PathBuf>,
433 users: u16,
434 invite_code: Option<InviteCode>,
435 initial_notes: Option<OOBNotes>,
436 generate_invoice_with: Option<LnInvoiceGeneration>,
437 generated_invoices_per_user: u16,
438 ln_payment_sleep: Duration,
439 invoices_from_file: Vec<Bolt11Invoice>,
440 gateway_id: Option<String>,
441 notes_per_user: u16,
442 note_denomination: Amount,
443 invoice_amount: Amount,
444 event_sender: mpsc::UnboundedSender<MetricEvent>,
445) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
446 let db_path = get_db_path(&archive_dir);
447 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
448 let minimum_notes = notes_per_user * users;
449 let minimum_amount_required = note_denomination * u64::from(minimum_notes);
450
451 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
452 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
453 print_coordinator_notes(&coordinator).await?;
454 info!(
455 "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
456 );
457 remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
458 print_coordinator_notes(&coordinator).await?;
459
460 let users_clients = get_users_clients(users, db_path, invite_code).await?;
461
462 let mut users_notes =
463 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
464 let mut users_invoices = HashMap::new();
465 let mut user = 0;
466 for invoice in invoices_from_file {
468 users_invoices
469 .entry(user)
470 .or_insert_with(Vec::new)
471 .push(invoice);
472 user = (user + 1) % users;
473 }
474
475 info!("Starting user tasks");
476 let futures = users_clients
477 .into_iter()
478 .enumerate()
479 .map(|(u, client)| {
480 let u = u as u16;
481 let oob_notes = users_notes.remove(&u).unwrap();
482 let invoices = users_invoices.remove(&u).unwrap_or_default();
483 let event_sender = event_sender.clone();
484 let f: BoxFuture<_> = Box::pin(do_load_test_user_task(
485 format!("User {u}:"),
486 client,
487 oob_notes,
488 generated_invoices_per_user,
489 ln_payment_sleep,
490 invoice_amount,
491 invoices,
492 generate_invoice_with,
493 event_sender,
494 gateway_id.clone(),
495 ));
496 f
497 })
498 .collect::<Vec<_>>();
499
500 Ok(futures)
501}
502
503async fn get_notes_for_users(
504 users: u16,
505 notes_per_user: u16,
506 coordinator: ClientHandleArc,
507 note_denomination: Amount,
508) -> anyhow::Result<HashMap<u16, Vec<OOBNotes>>> {
509 let mut users_notes = HashMap::new();
510 for u in 0..users {
511 users_notes.insert(u, Vec::with_capacity(notes_per_user.into()));
512 for _ in 0..notes_per_user {
513 let (_, oob_notes) = do_spend_notes(&coordinator, note_denomination).await?;
514 let user_amount = oob_notes.total_amount();
515 info!("Giving {user_amount} to user {u}");
516 users_notes.get_mut(&u).unwrap().push(oob_notes);
517 }
518 }
519 Ok(users_notes)
520}
521
522async fn get_users_clients(
523 n: u16,
524 db_path: Option<PathBuf>,
525 invite_code: Option<InviteCode>,
526) -> anyhow::Result<Vec<ClientHandleArc>> {
527 let mut users_clients = Vec::with_capacity(n.into());
528 for u in 0..n {
529 let (client, _) = get_user_client(u, &db_path, &invite_code).await?;
530 users_clients.push(client);
531 }
532 Ok(users_clients)
533}
534
535async fn get_user_client(
536 user_index: u16,
537 db_path: &Option<PathBuf>,
538 invite_code: &Option<InviteCode>,
539) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
540 let user_db = db_path
541 .as_ref()
542 .map(|db_path| db_path.join(format!("user_{user_index}.db")));
543 let user_invite_code = if user_db.as_ref().is_some_and(|db| db.exists()) {
544 None
545 } else {
546 invite_code.clone()
547 };
548 let (client, invite_code) = build_client(user_invite_code, user_db.as_ref()).await?;
549 if let Ok(ln_client) = client.get_first_module::<LightningClientModule>() {
551 let _ = ln_client.update_gateway_cache().await;
552 }
553 Ok((client, invite_code))
554}
555
556async fn print_coordinator_notes(coordinator: &ClientHandleArc) -> anyhow::Result<()> {
557 info!("Note summary:");
558 let summary = get_note_summary(coordinator).await?;
559 for (k, v) in summary.iter() {
560 info!("{k}: {v}");
561 }
562 Ok(())
563}
564
565async fn get_required_notes(
566 coordinator: &ClientHandleArc,
567 minimum_amount_required: Amount,
568 event_sender: &mpsc::UnboundedSender<MetricEvent>,
569) -> anyhow::Result<()> {
570 let current_balance = coordinator.get_balance_for_btc().await?;
571
572 if current_balance < minimum_amount_required {
573 let diff = minimum_amount_required.saturating_sub(current_balance);
574 info!(
575 "Current balance {current_balance} on coordinator not enough, trying to get {diff} more through fedimint-cli"
576 );
577 match try_get_notes_cli(&diff, 5).await {
578 Ok(notes) => {
579 info!("Got {} more notes, reissuing them", notes.total_amount());
580 reissue_notes(coordinator, notes, event_sender).await?;
581 }
582 Err(e) => {
583 info!("Unable to get more notes: '{e}', will try to proceed without them");
584 }
585 }
586 } else {
587 info!(
588 "Current balance of {current_balance} already covers the minimum required of {minimum_amount_required}"
589 );
590 }
591 Ok(())
592}
593
594async fn reissue_initial_notes(
595 initial_notes: Option<OOBNotes>,
596 coordinator: &ClientHandleArc,
597 event_sender: &mpsc::UnboundedSender<MetricEvent>,
598) -> anyhow::Result<()> {
599 if let Some(notes) = initial_notes {
600 let amount = notes.total_amount();
601 info!("Reissuing initial notes, got {amount}");
602 reissue_notes(coordinator, notes, event_sender).await?;
603 }
604 Ok(())
605}
606
607async fn get_coordinator_client(
608 db_path: &Option<PathBuf>,
609 invite_code: &Option<InviteCode>,
610) -> anyhow::Result<(ClientHandleArc, Option<InviteCode>)> {
611 let (client, invite_code) = if let Some(db_path) = db_path {
612 let coordinator_db = db_path.join("coordinator.db");
613 if coordinator_db.exists() {
614 build_client(invite_code.clone(), Some(&coordinator_db)).await?
615 } else {
616 tokio::fs::create_dir_all(db_path).await?;
617 build_client(
618 Some(invite_code.clone().context(
619 "Running on this archive dir for the first time, an invite code is required",
620 )?),
621 Some(&coordinator_db),
622 )
623 .await?
624 }
625 } else {
626 build_client(
627 Some(
628 invite_code
629 .clone()
630 .context("No archive dir given, an invite code is strictly required")?,
631 ),
632 None,
633 )
634 .await?
635 };
636 Ok((client, invite_code))
637}
638
639fn get_db_path(archive_dir: &Option<PathBuf>) -> Option<PathBuf> {
640 archive_dir.as_ref().map(|p| p.join("db"))
641}
642
643async fn get_lightning_gateway(
644 client: &ClientHandleArc,
645 gateway_id: Option<String>,
646) -> Option<LightningGateway> {
647 let gateway_id = parse_gateway_id(gateway_id.or(None)?.as_str()).expect("Invalid gateway id");
648 let ln_module = client
649 .get_first_module::<LightningClientModule>()
650 .expect("Must have ln client module");
651 ln_module.select_gateway(&gateway_id).await
652}
653
654#[allow(clippy::too_many_arguments)]
655async fn do_load_test_user_task(
656 prefix: String,
657 client: ClientHandleArc,
658 oob_notes: Vec<OOBNotes>,
659 generated_invoices_per_user: u16,
660 ln_payment_sleep: Duration,
661 invoice_amount: Amount,
662 additional_invoices: Vec<Bolt11Invoice>,
663 generate_invoice_with: Option<LnInvoiceGeneration>,
664 event_sender: mpsc::UnboundedSender<MetricEvent>,
665 gateway_id: Option<String>,
666) -> anyhow::Result<()> {
667 let ln_gateway = get_lightning_gateway(&client, gateway_id).await;
668 for oob_note in oob_notes {
669 let amount = oob_note.total_amount();
670 reissue_notes(&client, oob_note, &event_sender)
671 .await
672 .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
673 }
674 let mut generated_invoices_per_user_iterator = (0..generated_invoices_per_user).peekable();
675 while let Some(_) = generated_invoices_per_user_iterator.next() {
676 let total_amount = get_note_summary(&client).await?.total_amount();
677 if invoice_amount > total_amount {
678 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
679 } else {
680 match generate_invoice_with {
681 Some(LnInvoiceGeneration::LdkLightningCli) => {
682 let invoice = ldk_create_invoice(invoice_amount).await?;
683 gateway_pay_invoice(
684 &prefix,
685 "LND",
686 &client,
687 invoice.clone(),
688 &event_sender,
689 ln_gateway.clone(),
690 )
691 .await?;
692 ldk_wait_invoice_payment(&invoice).await?;
693 }
694 None if additional_invoices.is_empty() => {
695 debug!(
696 "No method given to generate an invoice and no invoices on file, will not test the gateway"
697 );
698 break;
699 }
700 None => {
701 break;
702 }
703 }
704 if generated_invoices_per_user_iterator.peek().is_some() {
705 fedimint_core::task::sleep(ln_payment_sleep).await;
707 }
708 }
709 }
710 let mut additional_invoices = additional_invoices.into_iter().peekable();
711 while let Some(invoice) = additional_invoices.next() {
712 let total_amount = get_note_summary(&client).await?.total_amount();
713 let invoice_amount =
714 Amount::from_msats(invoice.amount_milli_satoshis().unwrap_or_default());
715 if invoice_amount > total_amount {
716 warn!("Can't pay invoice, not enough funds: {invoice_amount} > {total_amount}");
717 } else if invoice_amount == Amount::ZERO {
718 warn!("Can't pay invoice {invoice}, amount is zero");
719 } else {
720 gateway_pay_invoice(
721 &prefix,
722 "unknown",
723 &client,
724 invoice,
725 &event_sender,
726 ln_gateway.clone(),
727 )
728 .await?;
729 if additional_invoices.peek().is_some() {
730 fedimint_core::task::sleep(ln_payment_sleep).await;
732 }
733 }
734 }
735 Ok(())
736}
737
738#[allow(clippy::too_many_arguments)]
739async fn run_ln_circular_load_test(
740 archive_dir: Option<PathBuf>,
741 users: u16,
742 invite_code: Option<InviteCode>,
743 initial_notes: Option<OOBNotes>,
744 test_duration: Duration,
745 ln_payment_sleep: Duration,
746 notes_per_user: u16,
747 note_denomination: Amount,
748 invoice_amount: Amount,
749 strategy: LnCircularStrategy,
750 event_sender: mpsc::UnboundedSender<MetricEvent>,
751) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
752 let db_path = get_db_path(&archive_dir);
753 let (coordinator, invite_code) = get_coordinator_client(&db_path, &invite_code).await?;
754 let minimum_notes = notes_per_user * users;
755 let minimum_amount_required = note_denomination * u64::from(minimum_notes);
756
757 reissue_initial_notes(initial_notes, &coordinator, &event_sender).await?;
758 get_required_notes(&coordinator, minimum_amount_required, &event_sender).await?;
759
760 info!(
761 "Reminting {minimum_notes} notes of denomination {note_denomination} for {users} users, {notes_per_user} notes per user (this may take a while if the number of users/notes is high)"
762 );
763 remint_denomination(&coordinator, note_denomination, minimum_notes).await?;
764
765 print_coordinator_notes(&coordinator).await?;
766
767 let users_clients = get_users_clients(users, db_path, invite_code.clone()).await?;
768
769 let mut users_notes =
770 get_notes_for_users(users, notes_per_user, coordinator, note_denomination).await?;
771
772 info!("Starting user tasks");
773 let futures = users_clients
774 .into_iter()
775 .enumerate()
776 .map(|(u, client)| {
777 let u = u as u16;
778 let oob_notes = users_notes.remove(&u).unwrap();
779 let event_sender = event_sender.clone();
780 let f: BoxFuture<_> = Box::pin(do_ln_circular_test_user_task(
781 format!("User {u}:"),
782 client,
783 invite_code.clone(),
784 oob_notes,
785 test_duration,
786 ln_payment_sleep,
787 invoice_amount,
788 strategy,
789 event_sender,
790 ));
791 f
792 })
793 .collect::<Vec<_>>();
794
795 Ok(futures)
796}
797
798#[allow(clippy::too_many_arguments)]
799async fn do_ln_circular_test_user_task(
800 prefix: String,
801 client: ClientHandleArc,
802 invite_code: Option<InviteCode>,
803 oob_notes: Vec<OOBNotes>,
804 test_duration: Duration,
805 ln_payment_sleep: Duration,
806 invoice_amount: Amount,
807 strategy: LnCircularStrategy,
808 event_sender: mpsc::UnboundedSender<MetricEvent>,
809) -> anyhow::Result<()> {
810 for oob_note in oob_notes {
811 let amount = oob_note.total_amount();
812 reissue_notes(&client, oob_note, &event_sender)
813 .await
814 .map_err(|e| anyhow!("while reissuing initial {amount}: {e}"))?;
815 }
816 let initial_time = fedimint_core::time::now();
817 let still_ontime = || async {
818 fedimint_core::time::now()
819 .duration_since(initial_time)
820 .expect("time to work")
821 <= test_duration
822 };
823 let sleep_a_bit = || async {
824 if still_ontime().await {
825 fedimint_core::task::sleep(ln_payment_sleep).await;
826 }
827 };
828 match strategy {
829 LnCircularStrategy::TwoGateways => {
830 let invoice_generation = LnInvoiceGeneration::LdkLightningCli;
831 while still_ontime().await {
832 let gateway_id = get_gateway_id(invoice_generation).await?;
833 let ln_gateway = get_lightning_gateway(&client, Some(gateway_id)).await;
834 run_two_gateways_strategy(
835 &prefix,
836 &invoice_generation,
837 &invoice_amount,
838 &event_sender,
839 &client,
840 ln_gateway,
841 )
842 .await?;
843 sleep_a_bit().await;
844 }
845 }
846 LnCircularStrategy::SelfPayment => {
847 while still_ontime().await {
848 do_self_payment(&prefix, &client, invoice_amount, &event_sender).await?;
849 sleep_a_bit().await;
850 }
851 }
852 LnCircularStrategy::PartnerPingPong => {
853 let (partner, _) = build_client(invite_code, None).await?;
854 while still_ontime().await {
855 do_partner_ping_pong(&prefix, &client, &partner, invoice_amount, &event_sender)
856 .await?;
857 sleep_a_bit().await;
858 }
859 }
860 }
861 Ok(())
862}
863
864const GATEWAY_CREATE_INVOICE: &str = "gateway_create_invoice";
865
866async fn run_two_gateways_strategy(
867 prefix: &str,
868 invoice_generation: &LnInvoiceGeneration,
869 invoice_amount: &Amount,
870 event_sender: &mpsc::UnboundedSender<MetricEvent>,
871 client: &ClientHandleArc,
872 ln_gateway: Option<LightningGateway>,
873) -> Result<(), anyhow::Error> {
874 let create_invoice_time = fedimint_core::time::now();
875 match *invoice_generation {
876 LnInvoiceGeneration::LdkLightningCli => {
877 let invoice = ldk_create_invoice(*invoice_amount).await?;
878 let elapsed = create_invoice_time.elapsed()?;
879 info!("Created invoice using LDK in {elapsed:?}");
880 event_sender.send(MetricEvent {
881 name: GATEWAY_CREATE_INVOICE.into(),
882 duration: elapsed,
883 })?;
884 gateway_pay_invoice(
885 prefix,
886 "LND",
887 client,
888 invoice.clone(),
889 event_sender,
890 ln_gateway.clone(),
891 )
892 .await?;
893 ldk_wait_invoice_payment(&invoice).await?;
894 let (operation_id, invoice) =
895 client_create_invoice(client, *invoice_amount, event_sender, ln_gateway).await?;
896 let pay_invoice_time = fedimint_core::time::now();
897 ldk_pay_invoice(invoice).await?;
898 wait_invoice_payment(
899 prefix,
900 "LND",
901 client,
902 operation_id,
903 event_sender,
904 pay_invoice_time,
905 )
906 .await?;
907 }
908 }
909 Ok(())
910}
911
912async fn do_self_payment(
913 prefix: &str,
914 client: &ClientHandleArc,
915 invoice_amount: Amount,
916 event_sender: &mpsc::UnboundedSender<MetricEvent>,
917) -> anyhow::Result<()> {
918 let (operation_id, invoice) =
919 client_create_invoice(client, invoice_amount, event_sender, None).await?;
920 let pay_invoice_time = fedimint_core::time::now();
921 let lightning_module = client.get_first_module::<LightningClientModule>()?;
922 lightning_module
924 .pay_bolt11_invoice(None, invoice, ())
925 .await?;
926 wait_invoice_payment(
927 prefix,
928 "gateway",
929 client,
930 operation_id,
931 event_sender,
932 pay_invoice_time,
933 )
934 .await?;
935 Ok(())
936}
937
938async fn do_partner_ping_pong(
939 prefix: &str,
940 client: &ClientHandleArc,
941 partner: &ClientHandleArc,
942 invoice_amount: Amount,
943 event_sender: &mpsc::UnboundedSender<MetricEvent>,
944) -> anyhow::Result<()> {
945 let (operation_id, invoice) =
947 client_create_invoice(partner, invoice_amount, event_sender, None).await?;
948 let pay_invoice_time = fedimint_core::time::now();
949 let lightning_module = client.get_first_module::<LightningClientModule>()?;
950 lightning_module
953 .pay_bolt11_invoice(None, invoice, ())
954 .await?;
955 wait_invoice_payment(
956 prefix,
957 "gateway",
958 partner,
959 operation_id,
960 event_sender,
961 pay_invoice_time,
962 )
963 .await?;
964 let (operation_id, invoice) =
966 client_create_invoice(client, invoice_amount, event_sender, None).await?;
967 let pay_invoice_time = fedimint_core::time::now();
968 let partner_lightning_module = partner.get_first_module::<LightningClientModule>()?;
969 partner_lightning_module
972 .pay_bolt11_invoice(None, invoice, ())
973 .await?;
974 wait_invoice_payment(
975 prefix,
976 "gateway",
977 client,
978 operation_id,
979 event_sender,
980 pay_invoice_time,
981 )
982 .await?;
983 Ok(())
984}
985
986async fn wait_invoice_payment(
987 prefix: &str,
988 gateway_name: &str,
989 client: &ClientHandleArc,
990 operation_id: fedimint_core::core::OperationId,
991 event_sender: &mpsc::UnboundedSender<MetricEvent>,
992 pay_invoice_time: std::time::SystemTime,
993) -> anyhow::Result<()> {
994 let elapsed = pay_invoice_time.elapsed()?;
995 info!("{prefix} Invoice payment receive started using {gateway_name} in {elapsed:?}");
996 event_sender.send(MetricEvent {
997 name: format!("gateway_{gateway_name}_payment_received_started"),
998 duration: elapsed,
999 })?;
1000 let lightning_module = client.get_first_module::<LightningClientModule>()?;
1001 let mut updates = lightning_module
1002 .subscribe_ln_receive(operation_id)
1003 .await?
1004 .into_stream();
1005 while let Some(update) = updates.next().await {
1006 debug!(%prefix, ?update, "Invoice payment update");
1007 match update {
1008 LnReceiveState::Claimed => {
1009 let elapsed: Duration = pay_invoice_time.elapsed()?;
1010 info!("{prefix} Invoice payment received on {gateway_name} in {elapsed:?}");
1011 event_sender.send(MetricEvent {
1012 name: "gateway_payment_received_success".into(),
1013 duration: elapsed,
1014 })?;
1015 event_sender.send(MetricEvent {
1016 name: format!("gateway_{gateway_name}_payment_received_success"),
1017 duration: elapsed,
1018 })?;
1019 break;
1020 }
1021 LnReceiveState::Canceled { reason } => {
1022 let elapsed: Duration = pay_invoice_time.elapsed()?;
1023 info!(
1024 "{prefix} Invoice payment receive was canceled on {gateway_name}: {reason} in {elapsed:?}"
1025 );
1026 event_sender.send(MetricEvent {
1027 name: "gateway_payment_received_canceled".into(),
1028 duration: elapsed,
1029 })?;
1030 break;
1031 }
1032 _ => {}
1033 }
1034 }
1035 Ok(())
1036}
1037
1038async fn client_create_invoice(
1039 client: &ClientHandleArc,
1040 invoice_amount: Amount,
1041 event_sender: &mpsc::UnboundedSender<MetricEvent>,
1042 ln_gateway: Option<LightningGateway>,
1043) -> anyhow::Result<(fedimint_core::core::OperationId, Bolt11Invoice)> {
1044 let create_invoice_time = fedimint_core::time::now();
1045 let lightning_module = client.get_first_module::<LightningClientModule>()?;
1046 let desc = Description::new("test".to_string())?;
1047 let (operation_id, invoice, _) = lightning_module
1048 .create_bolt11_invoice(
1049 invoice_amount,
1050 Bolt11InvoiceDescription::Direct(desc),
1051 None,
1052 (),
1053 ln_gateway,
1054 )
1055 .await?;
1056 let elapsed = create_invoice_time.elapsed()?;
1057 info!("Created invoice using gateway in {elapsed:?}");
1058 event_sender.send(MetricEvent {
1059 name: GATEWAY_CREATE_INVOICE.into(),
1060 duration: elapsed,
1061 })?;
1062 Ok((operation_id, invoice))
1063}
1064
1065fn test_download_config(
1066 endpoints: &ConnectorRegistry,
1067 invite_code: &InviteCode,
1068 users: u16,
1069 event_sender: &mpsc::UnboundedSender<MetricEvent>,
1070) -> Vec<BoxFuture<'static, anyhow::Result<()>>> {
1071 (0..users)
1072 .map(move |_| {
1073 let invite_code = invite_code.clone();
1074 let event_sender = event_sender.clone();
1075 let endpoints = endpoints.clone();
1076 let f: BoxFuture<_> = Box::pin(async move {
1077 let m = fedimint_core::time::now();
1078 let _ = fedimint_api_client::api::net::ConnectorType::default()
1079 .download_from_invite_code(&endpoints, &invite_code)
1080 .await?;
1081 event_sender.send(MetricEvent {
1082 name: "download_client_config".into(),
1083 duration: m.elapsed()?,
1084 })?;
1085 Ok(())
1086 });
1087 f
1088 })
1089 .collect()
1090}
1091
1092async fn test_connect_raw_client(
1093 endpoints: &ConnectorRegistry,
1094 invite_code: InviteCode,
1095 users: u16,
1096 duration: Duration,
1097 timeout: Duration,
1098 limit_endpoints: Option<usize>,
1099 event_sender: mpsc::UnboundedSender<MetricEvent>,
1100) -> anyhow::Result<Vec<BoxFuture<'static, anyhow::Result<()>>>> {
1101 use jsonrpsee_core::client::ClientT;
1102 use jsonrpsee_ws_client::WsClientBuilder;
1103
1104 let (mut cfg, _) = fedimint_api_client::api::net::ConnectorType::default()
1105 .download_from_invite_code(endpoints, &invite_code)
1106 .await?;
1107
1108 if let Some(limit_endpoints) = limit_endpoints {
1109 cfg.global.api_endpoints = cfg
1110 .global
1111 .api_endpoints
1112 .into_iter()
1113 .take(limit_endpoints)
1114 .collect();
1115 info!("Limiting endpoints to {:?}", cfg.global.api_endpoints);
1116 }
1117
1118 info!("Connecting to {users} clients");
1119 let clients = (0..users)
1120 .flat_map(|_| {
1121 cfg.global.api_endpoints.values().map(|url| async {
1122 let ws_client = WsClientBuilder::default()
1123 .request_timeout(timeout)
1124 .connection_timeout(timeout)
1125 .build(url_to_string_with_default_port(&url.url))
1126 .await?;
1127 Ok::<_, anyhow::Error>(ws_client)
1128 })
1129 })
1130 .collect::<Vec<_>>();
1131 let clients = futures::future::try_join_all(clients).await?;
1132 info!("Keeping {users} clients connected for {duration:?}");
1133 Ok(clients
1134 .into_iter()
1135 .map(|client| {
1136 let event_sender = event_sender.clone();
1137 let f: BoxFuture<_> = Box::pin(async move {
1138 let initial_time = fedimint_core::time::now();
1139 while initial_time.elapsed()? < duration {
1140 let m = fedimint_core::time::now();
1141 let _epoch: u64 = client
1142 .request::<_, _>(SESSION_COUNT_ENDPOINT, vec![ApiRequestErased::default()])
1143 .await?;
1144 event_sender.send(MetricEvent {
1145 name: SESSION_COUNT_ENDPOINT.into(),
1146 duration: m.elapsed()?,
1147 })?;
1148 fedimint_core::task::sleep(Duration::from_secs(1)).await;
1149 }
1150 Ok(())
1151 });
1152 f
1153 })
1154 .collect())
1155}
1156
1157fn url_to_string_with_default_port(url: &SafeUrl) -> String {
1158 format!(
1159 "{}://{}:{}{}",
1160 url.scheme(),
1161 url.host().expect("Asserted on construction"),
1162 url.port_or_known_default()
1163 .expect("Asserted on construction"),
1164 url.path()
1165 )
1166}
1167
1168async fn handle_metrics_summary(
1169 opts: Opts,
1170 mut event_receiver: mpsc::UnboundedReceiver<MetricEvent>,
1171) -> anyhow::Result<()> {
1172 let timestamp_seconds = fedimint_core::time::duration_since_epoch().as_secs();
1173 let mut metrics_json_output_files = vec![];
1174 let mut previous_metrics = vec![];
1175 let mut comparison_output = None;
1176 if let Some(archive_dir) = opts.archive_dir {
1177 let mut archive_metrics = archive_dir.join("metrics");
1178 archive_metrics.push(opts.users.to_string());
1179 tokio::fs::create_dir_all(&archive_metrics).await?;
1180 let mut archive_comparisons = archive_dir.join("comparisons");
1181 archive_comparisons.push(opts.users.to_string());
1182 tokio::fs::create_dir_all(&archive_comparisons).await?;
1183
1184 let latest_metrics_file = std::fs::read_dir(&archive_metrics)?
1185 .map(|entry| {
1186 let entry = entry.unwrap();
1187 let metadata = entry.metadata().unwrap();
1188 let created = metadata
1189 .created()
1190 .unwrap_or_else(|_| metadata.modified().unwrap());
1191 (entry, created)
1192 })
1193 .max_by_key(|(_entry, created)| created.to_owned())
1194 .map(|(entry, _)| entry.path());
1195 if let Some(latest_metrics_file) = latest_metrics_file {
1196 let display_path = latest_metrics_file.display();
1197 let latest_metrics_file = tokio::fs::File::open(&latest_metrics_file)
1198 .await
1199 .with_context(|| format!("Failed to open {display_path}"))?;
1200 let mut lines = tokio::io::BufReader::new(latest_metrics_file).lines();
1201 while let Some(line) = lines.next_line().await? {
1202 match serde_json::from_str::<EventMetricSummary>(&line) {
1203 Ok(metric) => {
1204 previous_metrics.push(metric);
1205 }
1206 Err(e) => {
1207 warn!("Failed to parse previous metric: {e:?}");
1208 }
1209 }
1210 }
1211 }
1212 let new_metric_output = archive_metrics.join(format!("{timestamp_seconds}.json",));
1213 let new_metric_output = BufWriter::new(
1214 OpenOptions::new()
1215 .write(true)
1216 .create(true)
1217 .truncate(true)
1218 .open(new_metric_output)
1219 .await?,
1220 );
1221 metrics_json_output_files.push(new_metric_output);
1222 if !previous_metrics.is_empty() {
1223 let new_comparison_output =
1224 archive_comparisons.join(format!("{timestamp_seconds}.json",));
1225 comparison_output = Some(BufWriter::new(
1226 OpenOptions::new()
1227 .write(true)
1228 .create(true)
1229 .truncate(true)
1230 .open(new_comparison_output)
1231 .await?,
1232 ));
1233 }
1234 }
1235 if let Some(metrics_json_output) = opts.metrics_json_output {
1236 metrics_json_output_files.push(BufWriter::new(
1237 tokio::fs::OpenOptions::new()
1238 .write(true)
1239 .create(true)
1240 .truncate(true)
1241 .open(metrics_json_output)
1242 .await?,
1243 ));
1244 }
1245 let mut results = BTreeMap::new();
1246 while let Some(event) = event_receiver.recv().await {
1247 let entry = results.entry(event.name).or_insert_with(Vec::new);
1248 entry.push(event.duration);
1249 }
1250 let mut previous_metrics = previous_metrics
1251 .into_iter()
1252 .map(|metric| (metric.name.clone(), metric))
1253 .collect::<HashMap<_, _>>();
1254 for (k, mut v) in results {
1255 v.sort();
1256 let n = v.len();
1257 let max = v.iter().last().unwrap();
1258 let min = v.first().unwrap();
1259 let median = v[n / 2];
1260 let sum: Duration = v.iter().sum();
1261 let avg = sum / n as u32;
1262 let metric_summary = EventMetricSummary {
1263 name: k.clone(),
1264 users: u64::from(opts.users),
1265 n: n as u64,
1266 avg_ms: avg.as_millis(),
1267 median_ms: median.as_millis(),
1268 max_ms: max.as_millis(),
1269 min_ms: min.as_millis(),
1270 timestamp_seconds,
1271 };
1272 let comparison = if let Some(previous_metric) = previous_metrics.remove(&k) {
1273 if previous_metric.n == metric_summary.n {
1274 fn calculate_gain(current: u128, previous: u128) -> f64 {
1275 current as f64 / previous as f64
1276 }
1277 let comparison = EventMetricComparison {
1278 avg_ms_gain: calculate_gain(metric_summary.avg_ms, previous_metric.avg_ms),
1279 median_ms_gain: calculate_gain(
1280 metric_summary.median_ms,
1281 previous_metric.median_ms,
1282 ),
1283 max_ms_gain: calculate_gain(metric_summary.max_ms, previous_metric.max_ms),
1284 min_ms_gain: calculate_gain(metric_summary.min_ms, previous_metric.min_ms),
1285 current: metric_summary.clone(),
1286 previous: previous_metric,
1287 };
1288 if let Some(comparison_output) = &mut comparison_output {
1289 let comparison_json =
1290 serde_json::to_string(&comparison).expect("to be serializable");
1291 comparison_output
1292 .write_all(format!("{comparison_json}\n").as_bytes())
1293 .await
1294 .expect("to write on file");
1295 }
1296 Some(comparison)
1297 } else {
1298 info!(
1299 "Skipping comparison for {k} because previous metric has different n ({} vs {})",
1300 previous_metric.n, metric_summary.n
1301 );
1302 None
1303 }
1304 } else {
1305 None
1306 };
1307 if let Some(comparison) = comparison {
1308 println!(
1309 "{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?} (compared to previous: {comparison})"
1310 );
1311 } else {
1312 println!("{n} {k}: avg {avg:?}, median {median:?}, max {max:?}, min {min:?}");
1313 }
1314 let metric_summary_json =
1315 serde_json::to_string(&metric_summary).expect("to be serializable");
1316 for metrics_json_output_file in &mut metrics_json_output_files {
1317 metrics_json_output_file
1318 .write_all(format!("{metric_summary_json}\n").as_bytes())
1319 .await
1320 .expect("to write on file");
1321 }
1322 }
1323 for mut output in metrics_json_output_files {
1324 output.flush().await?;
1325 }
1326 if let Some(mut output) = comparison_output {
1327 output.flush().await?;
1328 }
1329 Ok(())
1330}
1331
1332async fn get_gateway_id(generate_invoice_with: LnInvoiceGeneration) -> anyhow::Result<String> {
1333 let gateway_json = match generate_invoice_with {
1334 LnInvoiceGeneration::LdkLightningCli => {
1335 cmd!(GatewayLndCli, "info").out_json().await
1337 }
1338 }?;
1339
1340 let gatewayd_version = devimint::util::Gatewayd::version_or_default().await;
1341 let gateway_id = if gatewayd_version < *VERSION_0_10_0_ALPHA {
1342 gateway_json["gateway_id"]
1343 .as_str()
1344 .context("gateway_id must be a string")?
1345 .to_owned()
1346 } else {
1347 gateway_json["registrations"]["http"][1]
1348 .as_str()
1349 .context("gateway id must be a string")?
1350 .to_owned()
1351 };
1352
1353 Ok(gateway_id)
1354}