1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, ClientPreview, RootSecret};
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::config::{FederationId, FederationIdPrefix};
14use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::{Decodable, Encodable};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_core::{Amount, TieredCounts, impl_db_record};
20use fedimint_derive_secret::{ChildId, DerivableSecret};
21use fedimint_ln_client::{LightningClientInit, LightningClientModule};
22use fedimint_meta_client::{MetaClientInit, MetaClientModule};
23use fedimint_mint_client::{MintClientInit, MintClientModule, OOBNotes};
24use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
25use futures::StreamExt;
26use futures::future::{AbortHandle, Abortable};
27use lightning_invoice::Bolt11InvoiceDescriptionRef;
28use rand::thread_rng;
29use serde::{Deserialize, Serialize};
30use serde_json::json;
31use tokio::sync::Mutex;
32use tracing::info;
33
34#[repr(u8)]
36#[derive(Clone, Copy, Debug)]
37pub enum DbKeyPrefix {
38 ClientDatabase = 0x00,
39 Mnemonic = 0x01,
40}
41
42#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
43pub struct MnemonicKey;
44
45impl_db_record!(
46 key = MnemonicKey,
47 value = Vec<u8>,
48 db_prefix = DbKeyPrefix::Mnemonic,
49);
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ParsedNoteDetails {
54 pub total_amount: Amount,
56 pub federation_id_prefix: FederationIdPrefix,
58 pub federation_id: Option<FederationId>,
60 pub invite_code: Option<InviteCode>,
62 pub note_counts: TieredCounts,
64}
65
66#[derive(Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub struct RpcRequest {
69 pub request_id: u64,
70 #[serde(flatten)]
71 pub kind: RpcRequestKind,
72}
73
74#[derive(Serialize, Deserialize)]
75#[serde(tag = "type", rename_all = "snake_case")]
76pub enum RpcRequestKind {
77 SetMnemonic {
78 words: Vec<String>,
79 },
80 GenerateMnemonic,
81 GetMnemonic,
82 HasMnemonicSet,
83 JoinFederation {
85 invite_code: String,
86 force_recover: bool,
87 client_name: String,
88 },
89 OpenClient {
90 client_name: String,
91 },
92 CloseClient {
93 client_name: String,
94 },
95 ClientRpc {
96 client_name: String,
97 module: String,
98 method: String,
99 payload: serde_json::Value,
100 },
101 CancelRpc {
102 cancel_request_id: u64,
103 },
104 ParseInviteCode {
105 invite_code: String,
106 },
107 ParseBolt11Invoice {
108 invoice: String,
109 },
110 PreviewFederation {
111 invite_code: String,
112 },
113 ParseOobNotes {
114 oob_notes: String,
115 },
116 ParseLightningAddress {
117 address: String,
118 },
119}
120
121#[derive(Serialize, Deserialize, Clone, Debug)]
122pub struct RpcResponse {
123 pub request_id: u64,
124 #[serde(flatten)]
125 pub kind: RpcResponseKind,
126}
127
128#[derive(Serialize, Deserialize, Clone, Debug)]
129#[serde(tag = "type", rename_all = "snake_case")]
130pub enum RpcResponseKind {
131 Data { data: serde_json::Value },
132 Error { error: String },
133 Aborted {},
134 End {},
135}
136
137pub trait RpcResponseHandler: MaybeSend + MaybeSync {
138 fn handle_response(&self, response: RpcResponse);
139}
140
141pub struct RpcGlobalState {
142 connectors: ConnectorRegistry,
144 clients: Mutex<HashMap<String, ClientHandleArc>>,
145 rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
146 unified_database: Database,
147 preview_cache: std::sync::Mutex<Option<ClientPreview>>,
148}
149
150pub struct HandledRpc<'a> {
151 pub task: Option<BoxFuture<'a, ()>>,
152}
153
154impl RpcGlobalState {
155 pub fn new(connectors: ConnectorRegistry, unified_database: Database) -> Self {
156 Self {
157 connectors,
158 clients: Mutex::new(HashMap::new()),
159 rpc_handles: std::sync::Mutex::new(HashMap::new()),
160 unified_database,
161 preview_cache: std::sync::Mutex::new(None),
162 }
163 }
164
165 async fn add_client(&self, client_name: String, client: ClientHandleArc) {
166 let mut clients = self.clients.lock().await;
167 clients.insert(client_name, client);
168 }
169
170 async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
171 let clients = self.clients.lock().await;
172 clients.get(client_name).cloned()
173 }
174
175 fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
176 let mut handles = self.rpc_handles.lock().unwrap();
177 if handles.insert(request_id, handle).is_some() {
178 tracing::error!("RPC CLIENT ERROR: request id reuse detected");
179 }
180 }
181
182 fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
183 let mut handles = self.rpc_handles.lock().unwrap();
184 handles.remove(&request_id)
185 }
186
187 async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
188 let mut builder = fedimint_client::Client::builder().await?;
189 builder.with_module(MintClientInit);
190 builder.with_module(LightningClientInit::default());
191 builder.with_module(WalletClientInit(None));
192 builder.with_module(MetaClientInit);
193 Ok(builder)
194 }
195
196 async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
198 assert_eq!(client_name.len(), 36);
199
200 let unified_db = &self.unified_database;
201 let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
202 client_prefix.extend_from_slice(client_name.as_bytes());
203 Ok(unified_db.with_prefix(client_prefix))
204 }
205
206 async fn handle_join_federation(
208 &self,
209
210 invite_code: String,
211 client_name: String,
212 force_recover: bool,
213 ) -> anyhow::Result<()> {
214 let mnemonic = self
216 .get_mnemonic_from_db()
217 .await?
218 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
219
220 let client_db = self.client_db(client_name.clone()).await?;
221
222 let invite_code = InviteCode::from_str(&invite_code)?;
223 let federation_id = invite_code.federation_id();
224
225 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
227
228 let cached_preview = self.preview_cache.lock().unwrap().take();
230 let preview = match cached_preview {
231 Some(preview) if preview.config().calculate_federation_id() == federation_id => preview,
232 _ => {
233 let builder = Self::client_builder().await?;
234 builder
235 .preview(self.connectors.clone(), &invite_code)
236 .await?
237 }
238 };
239
240 #[allow(deprecated)]
242 let backup = preview
243 .download_backup_from_federation(RootSecret::StandardDoubleDerive(
244 federation_secret.clone(),
245 ))
246 .await?;
247
248 let client = if force_recover || backup.is_some() {
249 Arc::new(
250 preview
251 .recover(
252 client_db,
253 RootSecret::StandardDoubleDerive(federation_secret),
254 backup,
255 )
256 .await?,
257 )
258 } else {
259 Arc::new(
260 preview
261 .join(
262 client_db,
263 RootSecret::StandardDoubleDerive(federation_secret),
264 )
265 .await?,
266 )
267 };
268
269 self.add_client(client_name, client).await;
270 Ok(())
271 }
272
273 async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
274 let mnemonic = self
276 .get_mnemonic_from_db()
277 .await?
278 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
279
280 let client_db = self.client_db(client_name.clone()).await?;
281
282 if !fedimint_client::Client::is_initialized(&client_db).await {
283 anyhow::bail!("client is not initialized for this database");
284 }
285
286 let client_config = fedimint_client::Client::get_config_from_db(&client_db)
288 .await
289 .context("Client config not found in database")?;
290
291 let federation_id = client_config.calculate_federation_id();
292
293 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
295
296 let builder = Self::client_builder().await?;
297 let client = Arc::new(
298 builder
299 .open(
300 self.connectors.clone(),
301 client_db,
302 RootSecret::StandardDoubleDerive(federation_secret),
303 )
304 .await?,
305 );
306
307 self.add_client(client_name, client).await;
308 Ok(())
309 }
310
311 async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
312 let mut clients = self.clients.lock().await;
313 let mut client = clients.remove(&client_name).context("client not found")?;
314
315 for attempt in 0.. {
317 info!(attempt, "waiting for RPCs to drop the federation object");
318 match Arc::try_unwrap(client) {
319 Ok(client) => {
320 client.shutdown().await;
321 break;
322 }
323 Err(client_val) => client = client_val,
324 }
325 fedimint_core::task::sleep(Duration::from_millis(100)).await;
326 }
327 Ok(())
328 }
329
330 fn handle_client_rpc(
331 self: Arc<Self>,
332 client_name: String,
333 module: String,
334 method: String,
335 payload: serde_json::Value,
336 ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
337 Box::pin(try_stream! {
338 let client = self
339 .get_client(&client_name)
340 .await
341 .with_context(|| format!("Client not found: {client_name}"))?;
342 match module.as_str() {
343 "" => {
344 let mut stream = client.handle_global_rpc(method, payload);
345 while let Some(item) = stream.next().await {
346 yield item?;
347 }
348 }
349 "ln" => {
350 let ln = client.get_first_module::<LightningClientModule>()?.inner();
351 let mut stream = ln.handle_rpc(method, payload).await;
352 while let Some(item) = stream.next().await {
353 yield item?;
354 }
355 }
356 "mint" => {
357 let mint = client.get_first_module::<MintClientModule>()?.inner();
358 let mut stream = mint.handle_rpc(method, payload).await;
359 while let Some(item) = stream.next().await {
360 yield item?;
361 }
362 }
363 "wallet" => {
364 let wallet = client
365 .get_first_module::<WalletClientModule>()?
366 .inner();
367 let mut stream = wallet.handle_rpc(method, payload).await;
368 while let Some(item) = stream.next().await {
369 yield item?;
370 }
371 }
372 "meta" => {
373 let meta = client.get_first_module::<MetaClientModule>()?.inner();
374 let mut stream = meta.handle_rpc(method, payload).await;
375 while let Some(item) = stream.next().await {
376 yield item?;
377 }
378 }
379 _ => {
380 Err(anyhow::format_err!("module not found: {module}"))?;
381 },
382 };
383 })
384 }
385
386 fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
387 let invite_code = InviteCode::from_str(&invite_code)?;
388
389 Ok(json!({
390 "url": invite_code.url(),
391 "federation_id": invite_code.federation_id(),
392 }))
393 }
394
395 fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
396 let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
397 .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
398
399 let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
400 let amount_sat = amount_msat as f64 / 1000.0;
401
402 let expiry_seconds = invoice.expiry_time().as_secs();
403
404 let description = match invoice.description() {
406 Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
407 Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
408 };
409
410 Ok(json!({
411 "amount": amount_sat,
412 "expiry": expiry_seconds,
413 "memo": description,
414 }))
415 }
416
417 async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
418 let invite = InviteCode::from_str(&invite_code)?;
419 let federation_id = invite.federation_id();
420
421 let builder = Self::client_builder().await?;
422 let preview = builder.preview(self.connectors.clone(), &invite).await?;
423
424 let json_config = preview.config().to_json();
425 *self.preview_cache.lock().unwrap() = Some(preview);
427
428 Ok(json!({
429 "config": json_config,
430 "federation_id": federation_id.to_string(),
431 }))
432 }
433
434 fn handle_rpc_inner(
435 self: Arc<Self>,
436 request: RpcRequest,
437 ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
438 match request.kind {
439 RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
440 self.set_mnemonic(words).await?;
441 yield serde_json::json!({ "success": true });
442 })),
443 RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
444 let words = self.generate_mnemonic().await?;
445 yield serde_json::json!({ "mnemonic": words });
446 })),
447 RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
448 let words = self.get_mnemonic_words().await?;
449 yield serde_json::json!({ "mnemonic": words });
450 })),
451 RpcRequestKind::HasMnemonicSet => Some(Box::pin(try_stream! {
452 let is_set = self.has_mnemonic_set().await?;
453 yield serde_json::json!(is_set);
454 })),
455 RpcRequestKind::JoinFederation {
456 invite_code,
457 client_name,
458 force_recover,
459 } => Some(Box::pin(try_stream! {
460 self.handle_join_federation(invite_code, client_name, force_recover)
461 .await?;
462 yield serde_json::json!(null);
463 })),
464 RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
465 self.handle_open_client(client_name).await?;
466 yield serde_json::json!(null);
467 })),
468 RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
469 self.handle_close_client(client_name).await?;
470 yield serde_json::json!(null);
471 })),
472 RpcRequestKind::ClientRpc {
473 client_name,
474 module,
475 method,
476 payload,
477 } => Some(self.handle_client_rpc(client_name, module, method, payload)),
478 RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
479 let result = self.parse_invite_code(invite_code)?;
480 yield result;
481 })),
482 RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
483 let result = self.parse_bolt11_invoice(invoice)?;
484 yield result;
485 })),
486 RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
487 let result = self.preview_federation(invite_code).await?;
488 yield result;
489 })),
490 RpcRequestKind::ParseOobNotes { oob_notes } => Some(Box::pin(try_stream! {
491 let parsed = parse_oob_notes(&oob_notes)?;
492 yield serde_json::to_value(parsed)?;
493 })),
494 RpcRequestKind::ParseLightningAddress { address } => Some(Box::pin(try_stream! {
495 let url = fedimint_lnurl::parse_address(&address)
496 .context("Invalid Lightning Address")?;
497 let metadata = fedimint_lnurl::request(&url).await
498 .map_err(|e| anyhow::anyhow!(e))?;
499
500 yield serde_json::to_value(metadata)?;
501 })),
502 RpcRequestKind::CancelRpc { cancel_request_id } => {
503 if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
504 handle.abort();
505 }
506 None
507 }
508 }
509 }
510
511 pub fn handle_rpc(
512 self: Arc<Self>,
513 request: RpcRequest,
514 handler: impl RpcResponseHandler + 'static,
515 ) -> HandledRpc<'static> {
516 let request_id = request.request_id;
517
518 let Some(stream) = self.clone().handle_rpc_inner(request) else {
519 return HandledRpc { task: None };
520 };
521
522 let (abort_handle, abort_registration) = AbortHandle::new_pair();
523 self.add_rpc_handle(request_id, abort_handle);
524
525 let task = Box::pin(async move {
526 let mut stream = Abortable::new(stream, abort_registration);
527
528 while let Some(result) = stream.next().await {
529 let response = match result {
530 Ok(value) => RpcResponse {
531 request_id,
532 kind: RpcResponseKind::Data { data: value },
533 },
534 Err(e) => RpcResponse {
535 request_id,
536 kind: RpcResponseKind::Error {
537 error: e.to_string(),
538 },
539 },
540 };
541 handler.handle_response(response);
542 }
543
544 let _ = self.remove_rpc_handle(request_id);
546 handler.handle_response(RpcResponse {
547 request_id,
548 kind: if stream.is_aborted() {
549 RpcResponseKind::Aborted {}
550 } else {
551 RpcResponseKind::End {}
552 },
553 });
554 });
555
556 HandledRpc { task: Some(task) }
557 }
558
559 async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
563 let mnemonic = self.get_mnemonic_from_db().await?;
564
565 if let Some(mnemonic) = mnemonic {
566 let words = mnemonic.words().map(|w| w.to_string()).collect();
567 Ok(Some(words))
568 } else {
569 Ok(None)
570 }
571 }
572 async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
575 let all_words = words.join(" ");
576 let mnemonic =
577 Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
578
579 let mut dbtx = self.unified_database.begin_transaction().await;
580
581 if dbtx.get_value(&MnemonicKey).await.is_some() {
582 anyhow::bail!(
583 "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
584 );
585 }
586
587 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
588 .await;
589
590 dbtx.commit_tx().await;
591
592 Ok(())
593 }
594
595 async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
598 let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
599 let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
600
601 let mut dbtx = self.unified_database.begin_transaction().await;
602
603 if dbtx.get_value(&MnemonicKey).await.is_some() {
604 anyhow::bail!(
605 "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
606 );
607 }
608
609 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
610 .await;
611
612 dbtx.commit_tx().await;
613
614 Ok(words)
615 }
616
617 fn derive_federation_secret(
619 &self,
620 mnemonic: &Mnemonic,
621 federation_id: &FederationId,
622 ) -> DerivableSecret {
623 let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
624 let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
625 let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
626 let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
627 federation_wallet_root_secret.child_key(ChildId(0))
628 }
629
630 async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
632 let mut dbtx = self.unified_database.begin_transaction_nc().await;
633
634 if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
635 let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
636 Ok(Some(mnemonic))
637 } else {
638 Ok(None)
639 }
640 }
641
642 async fn has_mnemonic_set(&self) -> anyhow::Result<bool> {
644 let mnemonic = self.get_mnemonic_from_db().await?;
645 Ok(mnemonic.is_some())
646 }
647}
648
649pub fn parse_oob_notes(oob_notes_str: &str) -> anyhow::Result<ParsedNoteDetails> {
650 let oob_notes =
651 OOBNotes::from_str(oob_notes_str).context("Failed to parse OOB notes string")?;
652
653 let total_amount = oob_notes.total_amount();
654 let federation_id_prefix = oob_notes.federation_id_prefix();
655 let invite_code = oob_notes.federation_invite();
656 let federation_id = invite_code.as_ref().map(|inv| inv.federation_id());
657
658 let notes = oob_notes.notes();
660 let mut note_counts = TieredCounts::default();
661 for (amount, _note) in notes.iter_items() {
662 note_counts.inc(amount, 1);
663 }
664
665 Ok(ParsedNoteDetails {
666 total_amount,
667 federation_id_prefix,
668 federation_id,
669 invite_code,
670 note_counts,
671 })
672}