1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, ClientPreview, RootSecret};
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::config::{FederationId, FederationIdPrefix};
14use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::{Decodable, Encodable};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_core::{Amount, TieredCounts, impl_db_record};
20use fedimint_derive_secret::{ChildId, DerivableSecret};
21use fedimint_ln_client::{LightningClientInit, LightningClientModule};
22use fedimint_meta_client::MetaClientInit;
23use fedimint_mint_client::{MintClientInit, MintClientModule, OOBNotes};
24use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
25use futures::StreamExt;
26use futures::future::{AbortHandle, Abortable};
27use lightning_invoice::Bolt11InvoiceDescriptionRef;
28use rand::thread_rng;
29use serde::{Deserialize, Serialize};
30use serde_json::json;
31use tokio::sync::Mutex;
32use tracing::info;
33
34#[repr(u8)]
36#[derive(Clone, Copy, Debug)]
37pub enum DbKeyPrefix {
38 ClientDatabase = 0x00,
39 Mnemonic = 0x01,
40}
41
42#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
43pub struct MnemonicKey;
44
45impl_db_record!(
46 key = MnemonicKey,
47 value = Vec<u8>,
48 db_prefix = DbKeyPrefix::Mnemonic,
49);
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ParsedNoteDetails {
54 pub total_amount: Amount,
56 pub federation_id_prefix: FederationIdPrefix,
58 pub federation_id: Option<FederationId>,
60 pub invite_code: Option<InviteCode>,
62 pub note_counts: TieredCounts,
64}
65
66#[derive(Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub struct RpcRequest {
69 pub request_id: u64,
70 #[serde(flatten)]
71 pub kind: RpcRequestKind,
72}
73
74#[derive(Serialize, Deserialize)]
75#[serde(tag = "type", rename_all = "snake_case")]
76pub enum RpcRequestKind {
77 SetMnemonic {
78 words: Vec<String>,
79 },
80 GenerateMnemonic,
81 GetMnemonic,
82 HasMnemonicSet,
83 JoinFederation {
85 invite_code: String,
86 force_recover: bool,
87 client_name: String,
88 },
89 OpenClient {
90 client_name: String,
91 },
92 CloseClient {
93 client_name: String,
94 },
95 ClientRpc {
96 client_name: String,
97 module: String,
98 method: String,
99 payload: serde_json::Value,
100 },
101 CancelRpc {
102 cancel_request_id: u64,
103 },
104 ParseInviteCode {
105 invite_code: String,
106 },
107 ParseBolt11Invoice {
108 invoice: String,
109 },
110 PreviewFederation {
111 invite_code: String,
112 },
113 ParseOobNotes {
114 oob_notes: String,
115 },
116}
117
118#[derive(Serialize, Deserialize, Clone, Debug)]
119pub struct RpcResponse {
120 pub request_id: u64,
121 #[serde(flatten)]
122 pub kind: RpcResponseKind,
123}
124
125#[derive(Serialize, Deserialize, Clone, Debug)]
126#[serde(tag = "type", rename_all = "snake_case")]
127pub enum RpcResponseKind {
128 Data { data: serde_json::Value },
129 Error { error: String },
130 Aborted {},
131 End {},
132}
133
134pub trait RpcResponseHandler: MaybeSend + MaybeSync {
135 fn handle_response(&self, response: RpcResponse);
136}
137
138pub struct RpcGlobalState {
139 connectors: ConnectorRegistry,
141 clients: Mutex<HashMap<String, ClientHandleArc>>,
142 rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
143 unified_database: Database,
144 preview_cache: std::sync::Mutex<Option<ClientPreview>>,
145}
146
147pub struct HandledRpc<'a> {
148 pub task: Option<BoxFuture<'a, ()>>,
149}
150
151impl RpcGlobalState {
152 pub fn new(connectors: ConnectorRegistry, unified_database: Database) -> Self {
153 Self {
154 connectors,
155 clients: Mutex::new(HashMap::new()),
156 rpc_handles: std::sync::Mutex::new(HashMap::new()),
157 unified_database,
158 preview_cache: std::sync::Mutex::new(None),
159 }
160 }
161
162 async fn add_client(&self, client_name: String, client: ClientHandleArc) {
163 let mut clients = self.clients.lock().await;
164 clients.insert(client_name, client);
165 }
166
167 async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
168 let clients = self.clients.lock().await;
169 clients.get(client_name).cloned()
170 }
171
172 fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
173 let mut handles = self.rpc_handles.lock().unwrap();
174 if handles.insert(request_id, handle).is_some() {
175 tracing::error!("RPC CLIENT ERROR: request id reuse detected");
176 }
177 }
178
179 fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
180 let mut handles = self.rpc_handles.lock().unwrap();
181 handles.remove(&request_id)
182 }
183
184 async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
185 let mut builder = fedimint_client::Client::builder().await?;
186 builder.with_module(MintClientInit);
187 builder.with_module(LightningClientInit::default());
188 builder.with_module(WalletClientInit(None));
189 builder.with_module(MetaClientInit);
190 Ok(builder)
191 }
192
193 async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
195 assert_eq!(client_name.len(), 36);
196
197 let unified_db = &self.unified_database;
198 let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
199 client_prefix.extend_from_slice(client_name.as_bytes());
200 Ok(unified_db.with_prefix(client_prefix))
201 }
202
203 async fn handle_join_federation(
205 &self,
206
207 invite_code: String,
208 client_name: String,
209 force_recover: bool,
210 ) -> anyhow::Result<()> {
211 let mnemonic = self
213 .get_mnemonic_from_db()
214 .await?
215 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
216
217 let client_db = self.client_db(client_name.clone()).await?;
218
219 let invite_code = InviteCode::from_str(&invite_code)?;
220 let federation_id = invite_code.federation_id();
221
222 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
224
225 let cached_preview = self.preview_cache.lock().unwrap().take();
227 let preview = match cached_preview {
228 Some(preview) if preview.config().calculate_federation_id() == federation_id => preview,
229 _ => {
230 let builder = Self::client_builder().await?;
231 builder
232 .preview(self.connectors.clone(), &invite_code)
233 .await?
234 }
235 };
236
237 let backup = preview
239 .download_backup_from_federation(RootSecret::StandardDoubleDerive(
240 federation_secret.clone(),
241 ))
242 .await?;
243
244 let client = if force_recover || backup.is_some() {
245 Arc::new(
246 preview
247 .recover(
248 client_db,
249 RootSecret::StandardDoubleDerive(federation_secret),
250 backup,
251 )
252 .await?,
253 )
254 } else {
255 Arc::new(
256 preview
257 .join(
258 client_db,
259 RootSecret::StandardDoubleDerive(federation_secret),
260 )
261 .await?,
262 )
263 };
264
265 self.add_client(client_name, client).await;
266 Ok(())
267 }
268
269 async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
270 let mnemonic = self
272 .get_mnemonic_from_db()
273 .await?
274 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
275
276 let client_db = self.client_db(client_name.clone()).await?;
277
278 if !fedimint_client::Client::is_initialized(&client_db).await {
279 anyhow::bail!("client is not initialized for this database");
280 }
281
282 let client_config = fedimint_client::Client::get_config_from_db(&client_db)
284 .await
285 .context("Client config not found in database")?;
286
287 let federation_id = client_config.calculate_federation_id();
288
289 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
291
292 let builder = Self::client_builder().await?;
293 let client = Arc::new(
294 builder
295 .open(
296 self.connectors.clone(),
297 client_db,
298 RootSecret::StandardDoubleDerive(federation_secret),
299 )
300 .await?,
301 );
302
303 self.add_client(client_name, client).await;
304 Ok(())
305 }
306
307 async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
308 let mut clients = self.clients.lock().await;
309 let mut client = clients.remove(&client_name).context("client not found")?;
310
311 for attempt in 0.. {
313 info!(attempt, "waiting for RPCs to drop the federation object");
314 match Arc::try_unwrap(client) {
315 Ok(client) => {
316 client.shutdown().await;
317 break;
318 }
319 Err(client_val) => client = client_val,
320 }
321 fedimint_core::task::sleep(Duration::from_millis(100)).await;
322 }
323 Ok(())
324 }
325
326 fn handle_client_rpc(
327 self: Arc<Self>,
328 client_name: String,
329 module: String,
330 method: String,
331 payload: serde_json::Value,
332 ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
333 Box::pin(try_stream! {
334 let client = self
335 .get_client(&client_name)
336 .await
337 .with_context(|| format!("Client not found: {client_name}"))?;
338 match module.as_str() {
339 "" => {
340 let mut stream = client.handle_global_rpc(method, payload);
341 while let Some(item) = stream.next().await {
342 yield item?;
343 }
344 }
345 "ln" => {
346 let ln = client.get_first_module::<LightningClientModule>()?.inner();
347 let mut stream = ln.handle_rpc(method, payload).await;
348 while let Some(item) = stream.next().await {
349 yield item?;
350 }
351 }
352 "mint" => {
353 let mint = client.get_first_module::<MintClientModule>()?.inner();
354 let mut stream = mint.handle_rpc(method, payload).await;
355 while let Some(item) = stream.next().await {
356 yield item?;
357 }
358 }
359 "wallet" => {
360 let wallet = client
361 .get_first_module::<WalletClientModule>()?
362 .inner();
363 let mut stream = wallet.handle_rpc(method, payload).await;
364 while let Some(item) = stream.next().await {
365 yield item?;
366 }
367 }
368 _ => {
369 Err(anyhow::format_err!("module not found: {module}"))?;
370 },
371 };
372 })
373 }
374
375 fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
376 let invite_code = InviteCode::from_str(&invite_code)?;
377
378 Ok(json!({
379 "url": invite_code.url(),
380 "federation_id": invite_code.federation_id(),
381 }))
382 }
383
384 fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
385 let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
386 .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
387
388 let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
389 let amount_sat = amount_msat as f64 / 1000.0;
390
391 let expiry_seconds = invoice.expiry_time().as_secs();
392
393 let description = match invoice.description() {
395 Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
396 Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
397 };
398
399 Ok(json!({
400 "amount": amount_sat,
401 "expiry": expiry_seconds,
402 "memo": description,
403 }))
404 }
405
406 async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
407 let invite = InviteCode::from_str(&invite_code)?;
408 let federation_id = invite.federation_id();
409
410 let builder = Self::client_builder().await?;
411 let preview = builder.preview(self.connectors.clone(), &invite).await?;
412
413 let json_config = preview.config().to_json();
414 *self.preview_cache.lock().unwrap() = Some(preview);
416
417 Ok(json!({
418 "config": json_config,
419 "federation_id": federation_id.to_string(),
420 }))
421 }
422
423 fn handle_rpc_inner(
424 self: Arc<Self>,
425 request: RpcRequest,
426 ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
427 match request.kind {
428 RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
429 self.set_mnemonic(words).await?;
430 yield serde_json::json!({ "success": true });
431 })),
432 RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
433 let words = self.generate_mnemonic().await?;
434 yield serde_json::json!({ "mnemonic": words });
435 })),
436 RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
437 let words = self.get_mnemonic_words().await?;
438 yield serde_json::json!({ "mnemonic": words });
439 })),
440 RpcRequestKind::HasMnemonicSet => Some(Box::pin(try_stream! {
441 let is_set = self.has_mnemonic_set().await?;
442 yield serde_json::json!(is_set);
443 })),
444 RpcRequestKind::JoinFederation {
445 invite_code,
446 client_name,
447 force_recover,
448 } => Some(Box::pin(try_stream! {
449 self.handle_join_federation(invite_code, client_name, force_recover)
450 .await?;
451 yield serde_json::json!(null);
452 })),
453 RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
454 self.handle_open_client(client_name).await?;
455 yield serde_json::json!(null);
456 })),
457 RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
458 self.handle_close_client(client_name).await?;
459 yield serde_json::json!(null);
460 })),
461 RpcRequestKind::ClientRpc {
462 client_name,
463 module,
464 method,
465 payload,
466 } => Some(self.handle_client_rpc(client_name, module, method, payload)),
467 RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
468 let result = self.parse_invite_code(invite_code)?;
469 yield result;
470 })),
471 RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
472 let result = self.parse_bolt11_invoice(invoice)?;
473 yield result;
474 })),
475 RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
476 let result = self.preview_federation(invite_code).await?;
477 yield result;
478 })),
479 RpcRequestKind::ParseOobNotes { oob_notes } => Some(Box::pin(try_stream! {
480 let parsed = parse_oob_notes(&oob_notes)?;
481 yield serde_json::to_value(parsed)?;
482 })),
483 RpcRequestKind::CancelRpc { cancel_request_id } => {
484 if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
485 handle.abort();
486 }
487 None
488 }
489 }
490 }
491
492 pub fn handle_rpc(
493 self: Arc<Self>,
494 request: RpcRequest,
495 handler: impl RpcResponseHandler + 'static,
496 ) -> HandledRpc<'static> {
497 let request_id = request.request_id;
498
499 let Some(stream) = self.clone().handle_rpc_inner(request) else {
500 return HandledRpc { task: None };
501 };
502
503 let (abort_handle, abort_registration) = AbortHandle::new_pair();
504 self.add_rpc_handle(request_id, abort_handle);
505
506 let task = Box::pin(async move {
507 let mut stream = Abortable::new(stream, abort_registration);
508
509 while let Some(result) = stream.next().await {
510 let response = match result {
511 Ok(value) => RpcResponse {
512 request_id,
513 kind: RpcResponseKind::Data { data: value },
514 },
515 Err(e) => RpcResponse {
516 request_id,
517 kind: RpcResponseKind::Error {
518 error: e.to_string(),
519 },
520 },
521 };
522 handler.handle_response(response);
523 }
524
525 let _ = self.remove_rpc_handle(request_id);
527 handler.handle_response(RpcResponse {
528 request_id,
529 kind: if stream.is_aborted() {
530 RpcResponseKind::Aborted {}
531 } else {
532 RpcResponseKind::End {}
533 },
534 });
535 });
536
537 HandledRpc { task: Some(task) }
538 }
539
540 async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
544 let mnemonic = self.get_mnemonic_from_db().await?;
545
546 if let Some(mnemonic) = mnemonic {
547 let words = mnemonic.words().map(|w| w.to_string()).collect();
548 Ok(Some(words))
549 } else {
550 Ok(None)
551 }
552 }
553 async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
556 let all_words = words.join(" ");
557 let mnemonic =
558 Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
559
560 let mut dbtx = self.unified_database.begin_transaction().await;
561
562 if dbtx.get_value(&MnemonicKey).await.is_some() {
563 anyhow::bail!(
564 "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
565 );
566 }
567
568 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
569 .await;
570
571 dbtx.commit_tx().await;
572
573 Ok(())
574 }
575
576 async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
579 let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
580 let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
581
582 let mut dbtx = self.unified_database.begin_transaction().await;
583
584 if dbtx.get_value(&MnemonicKey).await.is_some() {
585 anyhow::bail!(
586 "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
587 );
588 }
589
590 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
591 .await;
592
593 dbtx.commit_tx().await;
594
595 Ok(words)
596 }
597
598 fn derive_federation_secret(
600 &self,
601 mnemonic: &Mnemonic,
602 federation_id: &FederationId,
603 ) -> DerivableSecret {
604 let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
605 let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
606 let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
607 let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
608 federation_wallet_root_secret.child_key(ChildId(0))
609 }
610
611 async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
613 let mut dbtx = self.unified_database.begin_transaction_nc().await;
614
615 if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
616 let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
617 Ok(Some(mnemonic))
618 } else {
619 Ok(None)
620 }
621 }
622
623 async fn has_mnemonic_set(&self) -> anyhow::Result<bool> {
625 let mnemonic = self.get_mnemonic_from_db().await?;
626 Ok(mnemonic.is_some())
627 }
628}
629
630pub fn parse_oob_notes(oob_notes_str: &str) -> anyhow::Result<ParsedNoteDetails> {
631 let oob_notes =
632 OOBNotes::from_str(oob_notes_str).context("Failed to parse OOB notes string")?;
633
634 let total_amount = oob_notes.total_amount();
635 let federation_id_prefix = oob_notes.federation_id_prefix();
636 let invite_code = oob_notes.federation_invite();
637 let federation_id = invite_code.as_ref().map(|inv| inv.federation_id());
638
639 let notes = oob_notes.notes();
641 let mut note_counts = TieredCounts::default();
642 for (amount, _note) in notes.iter_items() {
643 note_counts.inc(amount, 1);
644 }
645
646 Ok(ParsedNoteDetails {
647 total_amount,
648 federation_id_prefix,
649 federation_id,
650 invite_code,
651 note_counts,
652 })
653}