1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, ClientPreview, RootSecret};
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::config::{FederationId, FederationIdPrefix};
14use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::{Decodable, Encodable};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_core::{Amount, TieredCounts, impl_db_record};
20use fedimint_derive_secret::{ChildId, DerivableSecret};
21use fedimint_ln_client::{LightningClientInit, LightningClientModule};
22use fedimint_meta_client::MetaClientInit;
23use fedimint_mint_client::{MintClientInit, MintClientModule, OOBNotes};
24use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
25use futures::StreamExt;
26use futures::future::{AbortHandle, Abortable};
27use lightning_invoice::Bolt11InvoiceDescriptionRef;
28use rand::thread_rng;
29use serde::{Deserialize, Serialize};
30use serde_json::json;
31use tokio::sync::Mutex;
32use tracing::info;
33
34#[repr(u8)]
36#[derive(Clone, Copy, Debug)]
37pub enum DbKeyPrefix {
38 ClientDatabase = 0x00,
39 Mnemonic = 0x01,
40}
41
42#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
43pub struct MnemonicKey;
44
45impl_db_record!(
46 key = MnemonicKey,
47 value = Vec<u8>,
48 db_prefix = DbKeyPrefix::Mnemonic,
49);
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ParsedNoteDetails {
54 pub total_amount: Amount,
56 pub federation_id_prefix: FederationIdPrefix,
58 pub federation_id: Option<FederationId>,
60 pub invite_code: Option<InviteCode>,
62 pub note_counts: TieredCounts,
64}
65
66#[derive(Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub struct RpcRequest {
69 pub request_id: u64,
70 #[serde(flatten)]
71 pub kind: RpcRequestKind,
72}
73
74#[derive(Serialize, Deserialize)]
75#[serde(tag = "type", rename_all = "snake_case")]
76pub enum RpcRequestKind {
77 SetMnemonic {
78 words: Vec<String>,
79 },
80 GenerateMnemonic,
81 GetMnemonic,
82 HasMnemonicSet,
83 JoinFederation {
85 invite_code: String,
86 force_recover: bool,
87 client_name: String,
88 },
89 OpenClient {
90 client_name: String,
91 },
92 CloseClient {
93 client_name: String,
94 },
95 ClientRpc {
96 client_name: String,
97 module: String,
98 method: String,
99 payload: serde_json::Value,
100 },
101 CancelRpc {
102 cancel_request_id: u64,
103 },
104 ParseInviteCode {
105 invite_code: String,
106 },
107 ParseBolt11Invoice {
108 invoice: String,
109 },
110 PreviewFederation {
111 invite_code: String,
112 },
113 ParseOobNotes {
114 oob_notes: String,
115 },
116}
117
118#[derive(Serialize, Deserialize, Clone, Debug)]
119pub struct RpcResponse {
120 pub request_id: u64,
121 #[serde(flatten)]
122 pub kind: RpcResponseKind,
123}
124
125#[derive(Serialize, Deserialize, Clone, Debug)]
126#[serde(tag = "type", rename_all = "snake_case")]
127pub enum RpcResponseKind {
128 Data { data: serde_json::Value },
129 Error { error: String },
130 Aborted {},
131 End {},
132}
133
134pub trait RpcResponseHandler: MaybeSend + MaybeSync {
135 fn handle_response(&self, response: RpcResponse);
136}
137
138pub struct RpcGlobalState {
139 connectors: ConnectorRegistry,
141 clients: Mutex<HashMap<String, ClientHandleArc>>,
142 rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
143 unified_database: Database,
144 preview_cache: std::sync::Mutex<Option<ClientPreview>>,
145}
146
147pub struct HandledRpc<'a> {
148 pub task: Option<BoxFuture<'a, ()>>,
149}
150
151impl RpcGlobalState {
152 pub fn new(connectors: ConnectorRegistry, unified_database: Database) -> Self {
153 Self {
154 connectors,
155 clients: Mutex::new(HashMap::new()),
156 rpc_handles: std::sync::Mutex::new(HashMap::new()),
157 unified_database,
158 preview_cache: std::sync::Mutex::new(None),
159 }
160 }
161
162 async fn add_client(&self, client_name: String, client: ClientHandleArc) {
163 let mut clients = self.clients.lock().await;
164 clients.insert(client_name, client);
165 }
166
167 async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
168 let clients = self.clients.lock().await;
169 clients.get(client_name).cloned()
170 }
171
172 fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
173 let mut handles = self.rpc_handles.lock().unwrap();
174 if handles.insert(request_id, handle).is_some() {
175 tracing::error!("RPC CLIENT ERROR: request id reuse detected");
176 }
177 }
178
179 fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
180 let mut handles = self.rpc_handles.lock().unwrap();
181 handles.remove(&request_id)
182 }
183
184 async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
185 let mut builder = fedimint_client::Client::builder().await?;
186 builder.with_module(MintClientInit);
187 builder.with_module(LightningClientInit::default());
188 builder.with_module(WalletClientInit(None));
189 builder.with_module(MetaClientInit);
190 Ok(builder)
191 }
192
193 async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
195 assert_eq!(client_name.len(), 36);
196
197 let unified_db = &self.unified_database;
198 let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
199 client_prefix.extend_from_slice(client_name.as_bytes());
200 Ok(unified_db.with_prefix(client_prefix))
201 }
202
203 async fn handle_join_federation(
205 &self,
206
207 invite_code: String,
208 client_name: String,
209 force_recover: bool,
210 ) -> anyhow::Result<()> {
211 let mnemonic = self
213 .get_mnemonic_from_db()
214 .await?
215 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
216
217 let client_db = self.client_db(client_name.clone()).await?;
218
219 let invite_code = InviteCode::from_str(&invite_code)?;
220 let federation_id = invite_code.federation_id();
221
222 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
224
225 let cached_preview = self.preview_cache.lock().unwrap().take();
227 let preview = match cached_preview {
228 Some(preview) if preview.config().calculate_federation_id() == federation_id => preview,
229 _ => {
230 let builder = Self::client_builder().await?;
231 builder
232 .preview(self.connectors.clone(), &invite_code)
233 .await?
234 }
235 };
236
237 #[allow(deprecated)]
239 let backup = preview
240 .download_backup_from_federation(RootSecret::StandardDoubleDerive(
241 federation_secret.clone(),
242 ))
243 .await?;
244
245 let client = if force_recover || backup.is_some() {
246 Arc::new(
247 preview
248 .recover(
249 client_db,
250 RootSecret::StandardDoubleDerive(federation_secret),
251 backup,
252 )
253 .await?,
254 )
255 } else {
256 Arc::new(
257 preview
258 .join(
259 client_db,
260 RootSecret::StandardDoubleDerive(federation_secret),
261 )
262 .await?,
263 )
264 };
265
266 self.add_client(client_name, client).await;
267 Ok(())
268 }
269
270 async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
271 let mnemonic = self
273 .get_mnemonic_from_db()
274 .await?
275 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
276
277 let client_db = self.client_db(client_name.clone()).await?;
278
279 if !fedimint_client::Client::is_initialized(&client_db).await {
280 anyhow::bail!("client is not initialized for this database");
281 }
282
283 let client_config = fedimint_client::Client::get_config_from_db(&client_db)
285 .await
286 .context("Client config not found in database")?;
287
288 let federation_id = client_config.calculate_federation_id();
289
290 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
292
293 let builder = Self::client_builder().await?;
294 let client = Arc::new(
295 builder
296 .open(
297 self.connectors.clone(),
298 client_db,
299 RootSecret::StandardDoubleDerive(federation_secret),
300 )
301 .await?,
302 );
303
304 self.add_client(client_name, client).await;
305 Ok(())
306 }
307
308 async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
309 let mut clients = self.clients.lock().await;
310 let mut client = clients.remove(&client_name).context("client not found")?;
311
312 for attempt in 0.. {
314 info!(attempt, "waiting for RPCs to drop the federation object");
315 match Arc::try_unwrap(client) {
316 Ok(client) => {
317 client.shutdown().await;
318 break;
319 }
320 Err(client_val) => client = client_val,
321 }
322 fedimint_core::task::sleep(Duration::from_millis(100)).await;
323 }
324 Ok(())
325 }
326
327 fn handle_client_rpc(
328 self: Arc<Self>,
329 client_name: String,
330 module: String,
331 method: String,
332 payload: serde_json::Value,
333 ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
334 Box::pin(try_stream! {
335 let client = self
336 .get_client(&client_name)
337 .await
338 .with_context(|| format!("Client not found: {client_name}"))?;
339 match module.as_str() {
340 "" => {
341 let mut stream = client.handle_global_rpc(method, payload);
342 while let Some(item) = stream.next().await {
343 yield item?;
344 }
345 }
346 "ln" => {
347 let ln = client.get_first_module::<LightningClientModule>()?.inner();
348 let mut stream = ln.handle_rpc(method, payload).await;
349 while let Some(item) = stream.next().await {
350 yield item?;
351 }
352 }
353 "mint" => {
354 let mint = client.get_first_module::<MintClientModule>()?.inner();
355 let mut stream = mint.handle_rpc(method, payload).await;
356 while let Some(item) = stream.next().await {
357 yield item?;
358 }
359 }
360 "wallet" => {
361 let wallet = client
362 .get_first_module::<WalletClientModule>()?
363 .inner();
364 let mut stream = wallet.handle_rpc(method, payload).await;
365 while let Some(item) = stream.next().await {
366 yield item?;
367 }
368 }
369 _ => {
370 Err(anyhow::format_err!("module not found: {module}"))?;
371 },
372 };
373 })
374 }
375
376 fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
377 let invite_code = InviteCode::from_str(&invite_code)?;
378
379 Ok(json!({
380 "url": invite_code.url(),
381 "federation_id": invite_code.federation_id(),
382 }))
383 }
384
385 fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
386 let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
387 .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
388
389 let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
390 let amount_sat = amount_msat as f64 / 1000.0;
391
392 let expiry_seconds = invoice.expiry_time().as_secs();
393
394 let description = match invoice.description() {
396 Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
397 Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
398 };
399
400 Ok(json!({
401 "amount": amount_sat,
402 "expiry": expiry_seconds,
403 "memo": description,
404 }))
405 }
406
407 async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
408 let invite = InviteCode::from_str(&invite_code)?;
409 let federation_id = invite.federation_id();
410
411 let builder = Self::client_builder().await?;
412 let preview = builder.preview(self.connectors.clone(), &invite).await?;
413
414 let json_config = preview.config().to_json();
415 *self.preview_cache.lock().unwrap() = Some(preview);
417
418 Ok(json!({
419 "config": json_config,
420 "federation_id": federation_id.to_string(),
421 }))
422 }
423
424 fn handle_rpc_inner(
425 self: Arc<Self>,
426 request: RpcRequest,
427 ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
428 match request.kind {
429 RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
430 self.set_mnemonic(words).await?;
431 yield serde_json::json!({ "success": true });
432 })),
433 RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
434 let words = self.generate_mnemonic().await?;
435 yield serde_json::json!({ "mnemonic": words });
436 })),
437 RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
438 let words = self.get_mnemonic_words().await?;
439 yield serde_json::json!({ "mnemonic": words });
440 })),
441 RpcRequestKind::HasMnemonicSet => Some(Box::pin(try_stream! {
442 let is_set = self.has_mnemonic_set().await?;
443 yield serde_json::json!(is_set);
444 })),
445 RpcRequestKind::JoinFederation {
446 invite_code,
447 client_name,
448 force_recover,
449 } => Some(Box::pin(try_stream! {
450 self.handle_join_federation(invite_code, client_name, force_recover)
451 .await?;
452 yield serde_json::json!(null);
453 })),
454 RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
455 self.handle_open_client(client_name).await?;
456 yield serde_json::json!(null);
457 })),
458 RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
459 self.handle_close_client(client_name).await?;
460 yield serde_json::json!(null);
461 })),
462 RpcRequestKind::ClientRpc {
463 client_name,
464 module,
465 method,
466 payload,
467 } => Some(self.handle_client_rpc(client_name, module, method, payload)),
468 RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
469 let result = self.parse_invite_code(invite_code)?;
470 yield result;
471 })),
472 RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
473 let result = self.parse_bolt11_invoice(invoice)?;
474 yield result;
475 })),
476 RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
477 let result = self.preview_federation(invite_code).await?;
478 yield result;
479 })),
480 RpcRequestKind::ParseOobNotes { oob_notes } => Some(Box::pin(try_stream! {
481 let parsed = parse_oob_notes(&oob_notes)?;
482 yield serde_json::to_value(parsed)?;
483 })),
484 RpcRequestKind::CancelRpc { cancel_request_id } => {
485 if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
486 handle.abort();
487 }
488 None
489 }
490 }
491 }
492
493 pub fn handle_rpc(
494 self: Arc<Self>,
495 request: RpcRequest,
496 handler: impl RpcResponseHandler + 'static,
497 ) -> HandledRpc<'static> {
498 let request_id = request.request_id;
499
500 let Some(stream) = self.clone().handle_rpc_inner(request) else {
501 return HandledRpc { task: None };
502 };
503
504 let (abort_handle, abort_registration) = AbortHandle::new_pair();
505 self.add_rpc_handle(request_id, abort_handle);
506
507 let task = Box::pin(async move {
508 let mut stream = Abortable::new(stream, abort_registration);
509
510 while let Some(result) = stream.next().await {
511 let response = match result {
512 Ok(value) => RpcResponse {
513 request_id,
514 kind: RpcResponseKind::Data { data: value },
515 },
516 Err(e) => RpcResponse {
517 request_id,
518 kind: RpcResponseKind::Error {
519 error: e.to_string(),
520 },
521 },
522 };
523 handler.handle_response(response);
524 }
525
526 let _ = self.remove_rpc_handle(request_id);
528 handler.handle_response(RpcResponse {
529 request_id,
530 kind: if stream.is_aborted() {
531 RpcResponseKind::Aborted {}
532 } else {
533 RpcResponseKind::End {}
534 },
535 });
536 });
537
538 HandledRpc { task: Some(task) }
539 }
540
541 async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
545 let mnemonic = self.get_mnemonic_from_db().await?;
546
547 if let Some(mnemonic) = mnemonic {
548 let words = mnemonic.words().map(|w| w.to_string()).collect();
549 Ok(Some(words))
550 } else {
551 Ok(None)
552 }
553 }
554 async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
557 let all_words = words.join(" ");
558 let mnemonic =
559 Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
560
561 let mut dbtx = self.unified_database.begin_transaction().await;
562
563 if dbtx.get_value(&MnemonicKey).await.is_some() {
564 anyhow::bail!(
565 "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
566 );
567 }
568
569 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
570 .await;
571
572 dbtx.commit_tx().await;
573
574 Ok(())
575 }
576
577 async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
580 let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
581 let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
582
583 let mut dbtx = self.unified_database.begin_transaction().await;
584
585 if dbtx.get_value(&MnemonicKey).await.is_some() {
586 anyhow::bail!(
587 "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
588 );
589 }
590
591 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
592 .await;
593
594 dbtx.commit_tx().await;
595
596 Ok(words)
597 }
598
599 fn derive_federation_secret(
601 &self,
602 mnemonic: &Mnemonic,
603 federation_id: &FederationId,
604 ) -> DerivableSecret {
605 let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
606 let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
607 let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
608 let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
609 federation_wallet_root_secret.child_key(ChildId(0))
610 }
611
612 async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
614 let mut dbtx = self.unified_database.begin_transaction_nc().await;
615
616 if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
617 let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
618 Ok(Some(mnemonic))
619 } else {
620 Ok(None)
621 }
622 }
623
624 async fn has_mnemonic_set(&self) -> anyhow::Result<bool> {
626 let mnemonic = self.get_mnemonic_from_db().await?;
627 Ok(mnemonic.is_some())
628 }
629}
630
631pub fn parse_oob_notes(oob_notes_str: &str) -> anyhow::Result<ParsedNoteDetails> {
632 let oob_notes =
633 OOBNotes::from_str(oob_notes_str).context("Failed to parse OOB notes string")?;
634
635 let total_amount = oob_notes.total_amount();
636 let federation_id_prefix = oob_notes.federation_id_prefix();
637 let invite_code = oob_notes.federation_invite();
638 let federation_id = invite_code.as_ref().map(|inv| inv.federation_id());
639
640 let notes = oob_notes.notes();
642 let mut note_counts = TieredCounts::default();
643 for (amount, _note) in notes.iter_items() {
644 note_counts.inc(amount, 1);
645 }
646
647 Ok(ParsedNoteDetails {
648 total_amount,
649 federation_id_prefix,
650 federation_id,
651 invite_code,
652 note_counts,
653 })
654}