1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, ClientPreview, RootSecret};
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::config::{FederationId, FederationIdPrefix};
14use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::{Decodable, Encodable};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_core::{Amount, TieredCounts, impl_db_record};
20use fedimint_derive_secret::{ChildId, DerivableSecret};
21use fedimint_ln_client::{LightningClientInit, LightningClientModule};
22use fedimint_meta_client::MetaClientInit;
23use fedimint_mint_client::{MintClientInit, MintClientModule, OOBNotes};
24use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
25use futures::StreamExt;
26use futures::future::{AbortHandle, Abortable};
27use lightning_invoice::Bolt11InvoiceDescriptionRef;
28use rand::thread_rng;
29use serde::{Deserialize, Serialize};
30use serde_json::json;
31use tokio::sync::Mutex;
32use tracing::info;
33
34#[repr(u8)]
36#[derive(Clone, Copy, Debug)]
37pub enum DbKeyPrefix {
38 ClientDatabase = 0x00,
39 Mnemonic = 0x01,
40}
41
42#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
43pub struct MnemonicKey;
44
45impl_db_record!(
46 key = MnemonicKey,
47 value = Vec<u8>,
48 db_prefix = DbKeyPrefix::Mnemonic,
49);
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ParsedNoteDetails {
54 pub total_amount: Amount,
56 pub federation_id_prefix: FederationIdPrefix,
58 pub federation_id: Option<FederationId>,
60 pub invite_code: Option<InviteCode>,
62 pub note_counts: TieredCounts,
64}
65
66#[derive(Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub struct RpcRequest {
69 pub request_id: u64,
70 #[serde(flatten)]
71 pub kind: RpcRequestKind,
72}
73
74#[derive(Serialize, Deserialize)]
75#[serde(tag = "type", rename_all = "snake_case")]
76pub enum RpcRequestKind {
77 SetMnemonic {
78 words: Vec<String>,
79 },
80 GenerateMnemonic,
81 GetMnemonic,
82 JoinFederation {
84 invite_code: String,
85 force_recover: bool,
86 client_name: String,
87 },
88 OpenClient {
89 client_name: String,
90 },
91 CloseClient {
92 client_name: String,
93 },
94 ClientRpc {
95 client_name: String,
96 module: String,
97 method: String,
98 payload: serde_json::Value,
99 },
100 CancelRpc {
101 cancel_request_id: u64,
102 },
103 ParseInviteCode {
104 invite_code: String,
105 },
106 ParseBolt11Invoice {
107 invoice: String,
108 },
109 PreviewFederation {
110 invite_code: String,
111 },
112 ParseOobNotes {
113 oob_notes: String,
114 },
115}
116
117#[derive(Serialize, Deserialize, Clone, Debug)]
118pub struct RpcResponse {
119 pub request_id: u64,
120 #[serde(flatten)]
121 pub kind: RpcResponseKind,
122}
123
124#[derive(Serialize, Deserialize, Clone, Debug)]
125#[serde(tag = "type", rename_all = "snake_case")]
126pub enum RpcResponseKind {
127 Data { data: serde_json::Value },
128 Error { error: String },
129 Aborted {},
130 End {},
131}
132
133pub trait RpcResponseHandler: MaybeSend + MaybeSync {
134 fn handle_response(&self, response: RpcResponse);
135}
136
137pub struct RpcGlobalState {
138 connectors: ConnectorRegistry,
140 clients: Mutex<HashMap<String, ClientHandleArc>>,
141 rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
142 unified_database: Database,
143 preview_cache: std::sync::Mutex<Option<ClientPreview>>,
144}
145
146pub struct HandledRpc<'a> {
147 pub task: Option<BoxFuture<'a, ()>>,
148}
149
150impl RpcGlobalState {
151 pub fn new(connectors: ConnectorRegistry, unified_database: Database) -> Self {
152 Self {
153 connectors,
154 clients: Mutex::new(HashMap::new()),
155 rpc_handles: std::sync::Mutex::new(HashMap::new()),
156 unified_database,
157 preview_cache: std::sync::Mutex::new(None),
158 }
159 }
160
161 async fn add_client(&self, client_name: String, client: ClientHandleArc) {
162 let mut clients = self.clients.lock().await;
163 clients.insert(client_name, client);
164 }
165
166 async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
167 let clients = self.clients.lock().await;
168 clients.get(client_name).cloned()
169 }
170
171 fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
172 let mut handles = self.rpc_handles.lock().unwrap();
173 if handles.insert(request_id, handle).is_some() {
174 tracing::error!("RPC CLIENT ERROR: request id reuse detected");
175 }
176 }
177
178 fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
179 let mut handles = self.rpc_handles.lock().unwrap();
180 handles.remove(&request_id)
181 }
182
183 async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
184 let mut builder = fedimint_client::Client::builder().await?;
185 builder.with_module(MintClientInit);
186 builder.with_module(LightningClientInit::default());
187 builder.with_module(WalletClientInit(None));
188 builder.with_module(MetaClientInit);
189 Ok(builder)
190 }
191
192 async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
194 assert_eq!(client_name.len(), 36);
195
196 let unified_db = &self.unified_database;
197 let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
198 client_prefix.extend_from_slice(client_name.as_bytes());
199 Ok(unified_db.with_prefix(client_prefix))
200 }
201
202 async fn handle_join_federation(
204 &self,
205
206 invite_code: String,
207 client_name: String,
208 force_recover: bool,
209 ) -> anyhow::Result<()> {
210 let mnemonic = self
212 .get_mnemonic_from_db()
213 .await?
214 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
215
216 let client_db = self.client_db(client_name.clone()).await?;
217
218 let invite_code = InviteCode::from_str(&invite_code)?;
219 let federation_id = invite_code.federation_id();
220
221 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
223
224 let cached_preview = self.preview_cache.lock().unwrap().take();
226 let preview = match cached_preview {
227 Some(preview) if preview.config().calculate_federation_id() == federation_id => preview,
228 _ => {
229 let builder = Self::client_builder().await?;
230 builder
231 .preview(self.connectors.clone(), &invite_code)
232 .await?
233 }
234 };
235
236 let backup = preview
238 .download_backup_from_federation(RootSecret::StandardDoubleDerive(
239 federation_secret.clone(),
240 ))
241 .await?;
242
243 let client = if force_recover || backup.is_some() {
244 Arc::new(
245 preview
246 .recover(
247 client_db,
248 RootSecret::StandardDoubleDerive(federation_secret),
249 backup,
250 )
251 .await?,
252 )
253 } else {
254 Arc::new(
255 preview
256 .join(
257 client_db,
258 RootSecret::StandardDoubleDerive(federation_secret),
259 )
260 .await?,
261 )
262 };
263
264 self.add_client(client_name, client).await;
265 Ok(())
266 }
267
268 async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
269 let mnemonic = self
271 .get_mnemonic_from_db()
272 .await?
273 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
274
275 let client_db = self.client_db(client_name.clone()).await?;
276
277 if !fedimint_client::Client::is_initialized(&client_db).await {
278 anyhow::bail!("client is not initialized for this database");
279 }
280
281 let client_config = fedimint_client::Client::get_config_from_db(&client_db)
283 .await
284 .context("Client config not found in database")?;
285
286 let federation_id = client_config.calculate_federation_id();
287
288 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
290
291 let builder = Self::client_builder().await?;
292 let client = Arc::new(
293 builder
294 .open(
295 self.connectors.clone(),
296 client_db,
297 RootSecret::StandardDoubleDerive(federation_secret),
298 )
299 .await?,
300 );
301
302 self.add_client(client_name, client).await;
303 Ok(())
304 }
305
306 async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
307 let mut clients = self.clients.lock().await;
308 let mut client = clients.remove(&client_name).context("client not found")?;
309
310 for attempt in 0.. {
312 info!(attempt, "waiting for RPCs to drop the federation object");
313 match Arc::try_unwrap(client) {
314 Ok(client) => {
315 client.shutdown().await;
316 break;
317 }
318 Err(client_val) => client = client_val,
319 }
320 fedimint_core::task::sleep(Duration::from_millis(100)).await;
321 }
322 Ok(())
323 }
324
325 fn handle_client_rpc(
326 self: Arc<Self>,
327 client_name: String,
328 module: String,
329 method: String,
330 payload: serde_json::Value,
331 ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
332 Box::pin(try_stream! {
333 let client = self
334 .get_client(&client_name)
335 .await
336 .with_context(|| format!("Client not found: {client_name}"))?;
337 match module.as_str() {
338 "" => {
339 let mut stream = client.handle_global_rpc(method, payload);
340 while let Some(item) = stream.next().await {
341 yield item?;
342 }
343 }
344 "ln" => {
345 let ln = client.get_first_module::<LightningClientModule>()?.inner();
346 let mut stream = ln.handle_rpc(method, payload).await;
347 while let Some(item) = stream.next().await {
348 yield item?;
349 }
350 }
351 "mint" => {
352 let mint = client.get_first_module::<MintClientModule>()?.inner();
353 let mut stream = mint.handle_rpc(method, payload).await;
354 while let Some(item) = stream.next().await {
355 yield item?;
356 }
357 }
358 "wallet" => {
359 let wallet = client
360 .get_first_module::<WalletClientModule>()?
361 .inner();
362 let mut stream = wallet.handle_rpc(method, payload).await;
363 while let Some(item) = stream.next().await {
364 yield item?;
365 }
366 }
367 _ => {
368 Err(anyhow::format_err!("module not found: {module}"))?;
369 },
370 };
371 })
372 }
373
374 fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
375 let invite_code = InviteCode::from_str(&invite_code)?;
376
377 Ok(json!({
378 "url": invite_code.url(),
379 "federation_id": invite_code.federation_id(),
380 }))
381 }
382
383 fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
384 let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
385 .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
386
387 let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
388 let amount_sat = amount_msat as f64 / 1000.0;
389
390 let expiry_seconds = invoice.expiry_time().as_secs();
391
392 let description = match invoice.description() {
394 Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
395 Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
396 };
397
398 Ok(json!({
399 "amount": amount_sat,
400 "expiry": expiry_seconds,
401 "memo": description,
402 }))
403 }
404
405 async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
406 let invite = InviteCode::from_str(&invite_code)?;
407 let federation_id = invite.federation_id();
408
409 let builder = Self::client_builder().await?;
410 let preview = builder.preview(self.connectors.clone(), &invite).await?;
411
412 let json_config = preview.config().to_json();
413 *self.preview_cache.lock().unwrap() = Some(preview);
415
416 Ok(json!({
417 "config": json_config,
418 "federation_id": federation_id.to_string(),
419 }))
420 }
421
422 fn handle_rpc_inner(
423 self: Arc<Self>,
424 request: RpcRequest,
425 ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
426 match request.kind {
427 RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
428 self.set_mnemonic(words).await?;
429 yield serde_json::json!({ "success": true });
430 })),
431 RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
432 let words = self.generate_mnemonic().await?;
433 yield serde_json::json!({ "mnemonic": words });
434 })),
435 RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
436 let words = self.get_mnemonic_words().await?;
437 yield serde_json::json!({ "mnemonic": words });
438 })),
439 RpcRequestKind::JoinFederation {
440 invite_code,
441 client_name,
442 force_recover,
443 } => Some(Box::pin(try_stream! {
444 self.handle_join_federation(invite_code, client_name, force_recover)
445 .await?;
446 yield serde_json::json!(null);
447 })),
448 RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
449 self.handle_open_client(client_name).await?;
450 yield serde_json::json!(null);
451 })),
452 RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
453 self.handle_close_client(client_name).await?;
454 yield serde_json::json!(null);
455 })),
456 RpcRequestKind::ClientRpc {
457 client_name,
458 module,
459 method,
460 payload,
461 } => Some(self.handle_client_rpc(client_name, module, method, payload)),
462 RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
463 let result = self.parse_invite_code(invite_code)?;
464 yield result;
465 })),
466 RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
467 let result = self.parse_bolt11_invoice(invoice)?;
468 yield result;
469 })),
470 RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
471 let result = self.preview_federation(invite_code).await?;
472 yield result;
473 })),
474 RpcRequestKind::ParseOobNotes { oob_notes } => Some(Box::pin(try_stream! {
475 let parsed = parse_oob_notes(&oob_notes)?;
476 yield serde_json::to_value(parsed)?;
477 })),
478 RpcRequestKind::CancelRpc { cancel_request_id } => {
479 if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
480 handle.abort();
481 }
482 None
483 }
484 }
485 }
486
487 pub fn handle_rpc(
488 self: Arc<Self>,
489 request: RpcRequest,
490 handler: impl RpcResponseHandler + 'static,
491 ) -> HandledRpc<'static> {
492 let request_id = request.request_id;
493
494 let Some(stream) = self.clone().handle_rpc_inner(request) else {
495 return HandledRpc { task: None };
496 };
497
498 let (abort_handle, abort_registration) = AbortHandle::new_pair();
499 self.add_rpc_handle(request_id, abort_handle);
500
501 let task = Box::pin(async move {
502 let mut stream = Abortable::new(stream, abort_registration);
503
504 while let Some(result) = stream.next().await {
505 let response = match result {
506 Ok(value) => RpcResponse {
507 request_id,
508 kind: RpcResponseKind::Data { data: value },
509 },
510 Err(e) => RpcResponse {
511 request_id,
512 kind: RpcResponseKind::Error {
513 error: e.to_string(),
514 },
515 },
516 };
517 handler.handle_response(response);
518 }
519
520 let _ = self.remove_rpc_handle(request_id);
522 handler.handle_response(RpcResponse {
523 request_id,
524 kind: if stream.is_aborted() {
525 RpcResponseKind::Aborted {}
526 } else {
527 RpcResponseKind::End {}
528 },
529 });
530 });
531
532 HandledRpc { task: Some(task) }
533 }
534
535 async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
539 let mnemonic = self.get_mnemonic_from_db().await?;
540
541 if let Some(mnemonic) = mnemonic {
542 let words = mnemonic.words().map(|w| w.to_string()).collect();
543 Ok(Some(words))
544 } else {
545 Ok(None)
546 }
547 }
548 async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
551 let all_words = words.join(" ");
552 let mnemonic =
553 Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
554
555 let mut dbtx = self.unified_database.begin_transaction().await;
556
557 if dbtx.get_value(&MnemonicKey).await.is_some() {
558 anyhow::bail!(
559 "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
560 );
561 }
562
563 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
564 .await;
565
566 dbtx.commit_tx().await;
567
568 Ok(())
569 }
570
571 async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
574 let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
575 let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
576
577 let mut dbtx = self.unified_database.begin_transaction().await;
578
579 if dbtx.get_value(&MnemonicKey).await.is_some() {
580 anyhow::bail!(
581 "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
582 );
583 }
584
585 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
586 .await;
587
588 dbtx.commit_tx().await;
589
590 Ok(words)
591 }
592
593 fn derive_federation_secret(
595 &self,
596 mnemonic: &Mnemonic,
597 federation_id: &FederationId,
598 ) -> DerivableSecret {
599 let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
600 let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
601 let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
602 let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
603 federation_wallet_root_secret.child_key(ChildId(0))
604 }
605
606 async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
608 let mut dbtx = self.unified_database.begin_transaction_nc().await;
609
610 if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
611 let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
612 Ok(Some(mnemonic))
613 } else {
614 Ok(None)
615 }
616 }
617}
618
619pub fn parse_oob_notes(oob_notes_str: &str) -> anyhow::Result<ParsedNoteDetails> {
620 let oob_notes =
621 OOBNotes::from_str(oob_notes_str).context("Failed to parse OOB notes string")?;
622
623 let total_amount = oob_notes.total_amount();
624 let federation_id_prefix = oob_notes.federation_id_prefix();
625 let invite_code = oob_notes.federation_invite();
626 let federation_id = invite_code.as_ref().map(|inv| inv.federation_id());
627
628 let notes = oob_notes.notes();
630 let mut note_counts = TieredCounts::default();
631 for (amount, _note) in notes.iter_items() {
632 note_counts.inc(amount, 1);
633 }
634
635 Ok(ParsedNoteDetails {
636 total_amount,
637 federation_id_prefix,
638 federation_id,
639 invite_code,
640 note_counts,
641 })
642}