1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, ClientPreview, RootSecret};
12use fedimint_core::config::FederationId;
13use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::impl_db_record;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_derive_secret::{ChildId, DerivableSecret};
20use fedimint_ln_client::{LightningClientInit, LightningClientModule};
21use fedimint_meta_client::MetaClientInit;
22use fedimint_mint_client::{MintClientInit, MintClientModule};
23use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
24use futures::StreamExt;
25use futures::future::{AbortHandle, Abortable};
26use lightning_invoice::Bolt11InvoiceDescriptionRef;
27use rand::thread_rng;
28use serde::{Deserialize, Serialize};
29use serde_json::json;
30use tokio::sync::Mutex;
31use tracing::info;
32
33#[repr(u8)]
35#[derive(Clone, Copy, Debug)]
36pub enum DbKeyPrefix {
37 ClientDatabase = 0x00,
38 Mnemonic = 0x01,
39}
40
41#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
42pub struct MnemonicKey;
43
44impl_db_record!(
45 key = MnemonicKey,
46 value = Vec<u8>,
47 db_prefix = DbKeyPrefix::Mnemonic,
48);
49
50#[derive(Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52pub struct RpcRequest {
53 pub request_id: u64,
54 #[serde(flatten)]
55 pub kind: RpcRequestKind,
56}
57
58#[derive(Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum RpcRequestKind {
61 SetMnemonic {
62 words: Vec<String>,
63 },
64 GenerateMnemonic,
65 GetMnemonic,
66 JoinFederation {
68 invite_code: String,
69 force_recover: bool,
70 client_name: String,
71 },
72 OpenClient {
73 client_name: String,
74 },
75 CloseClient {
76 client_name: String,
77 },
78 ClientRpc {
79 client_name: String,
80 module: String,
81 method: String,
82 payload: serde_json::Value,
83 },
84 CancelRpc {
85 cancel_request_id: u64,
86 },
87 ParseInviteCode {
88 invite_code: String,
89 },
90 ParseBolt11Invoice {
91 invoice: String,
92 },
93 PreviewFederation {
94 invite_code: String,
95 },
96}
97
98#[derive(Serialize, Deserialize, Clone, Debug)]
99pub struct RpcResponse {
100 pub request_id: u64,
101 #[serde(flatten)]
102 pub kind: RpcResponseKind,
103}
104
105#[derive(Serialize, Deserialize, Clone, Debug)]
106#[serde(tag = "type", rename_all = "snake_case")]
107pub enum RpcResponseKind {
108 Data { data: serde_json::Value },
109 Error { error: String },
110 Aborted {},
111 End {},
112}
113
114pub trait RpcResponseHandler: MaybeSend + MaybeSync {
115 fn handle_response(&self, response: RpcResponse);
116}
117
118pub struct RpcGlobalState {
119 clients: Mutex<HashMap<String, ClientHandleArc>>,
120 rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
121 unified_database: Database,
122 preview_cache: std::sync::Mutex<Option<ClientPreview>>,
123}
124
125pub struct HandledRpc<'a> {
126 pub task: Option<BoxFuture<'a, ()>>,
127}
128
129impl RpcGlobalState {
130 pub fn new(unified_database: Database) -> Self {
131 Self {
132 clients: Mutex::new(HashMap::new()),
133 rpc_handles: std::sync::Mutex::new(HashMap::new()),
134 unified_database,
135 preview_cache: std::sync::Mutex::new(None),
136 }
137 }
138
139 async fn add_client(&self, client_name: String, client: ClientHandleArc) {
140 let mut clients = self.clients.lock().await;
141 clients.insert(client_name, client);
142 }
143
144 async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
145 let clients = self.clients.lock().await;
146 clients.get(client_name).cloned()
147 }
148
149 fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
150 let mut handles = self.rpc_handles.lock().unwrap();
151 if handles.insert(request_id, handle).is_some() {
152 tracing::error!("RPC CLIENT ERROR: request id reuse detected");
153 }
154 }
155
156 fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
157 let mut handles = self.rpc_handles.lock().unwrap();
158 handles.remove(&request_id)
159 }
160
161 async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
162 let mut builder = fedimint_client::Client::builder().await?;
163 builder.with_module(MintClientInit);
164 builder.with_module(LightningClientInit::default());
165 builder.with_module(WalletClientInit(None));
166 builder.with_module(MetaClientInit);
167 Ok(builder)
168 }
169
170 async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
172 assert_eq!(client_name.len(), 36);
173
174 let unified_db = &self.unified_database;
175 let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
176 client_prefix.extend_from_slice(client_name.as_bytes());
177 Ok(unified_db.with_prefix(client_prefix))
178 }
179
180 async fn handle_join_federation(
182 &self,
183 invite_code: String,
184 client_name: String,
185 force_recover: bool,
186 ) -> anyhow::Result<()> {
187 let mnemonic = self
189 .get_mnemonic_from_db()
190 .await?
191 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
192
193 let client_db = self.client_db(client_name.clone()).await?;
194
195 let invite_code = InviteCode::from_str(&invite_code)?;
196 let federation_id = invite_code.federation_id();
197
198 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
200
201 let cached_preview = self.preview_cache.lock().unwrap().take();
203 let preview = match cached_preview {
204 Some(preview) if preview.config().calculate_federation_id() == federation_id => preview,
205 _ => {
206 let builder = Self::client_builder().await?;
207 builder.preview(&invite_code).await?
208 }
209 };
210
211 let backup = preview
213 .download_backup_from_federation(RootSecret::StandardDoubleDerive(
214 federation_secret.clone(),
215 ))
216 .await?;
217
218 let client = if force_recover || backup.is_some() {
219 Arc::new(
220 preview
221 .recover(
222 client_db,
223 RootSecret::StandardDoubleDerive(federation_secret),
224 backup,
225 )
226 .await?,
227 )
228 } else {
229 Arc::new(
230 preview
231 .join(
232 client_db,
233 RootSecret::StandardDoubleDerive(federation_secret),
234 )
235 .await?,
236 )
237 };
238
239 self.add_client(client_name, client).await;
240 Ok(())
241 }
242
243 async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
244 let mnemonic = self
246 .get_mnemonic_from_db()
247 .await?
248 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
249
250 let client_db = self.client_db(client_name.clone()).await?;
251
252 if !fedimint_client::Client::is_initialized(&client_db).await {
253 anyhow::bail!("client is not initialized for this database");
254 }
255
256 let client_config = fedimint_client::Client::get_config_from_db(&client_db)
258 .await
259 .context("Client config not found in database")?;
260
261 let federation_id = client_config.calculate_federation_id();
262
263 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
265
266 let builder = Self::client_builder().await?;
267 let client = Arc::new(
268 builder
269 .open(
270 client_db,
271 RootSecret::StandardDoubleDerive(federation_secret),
272 )
273 .await?,
274 );
275
276 self.add_client(client_name, client).await;
277 Ok(())
278 }
279
280 async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
281 let mut clients = self.clients.lock().await;
282 let mut client = clients.remove(&client_name).context("client not found")?;
283
284 for attempt in 0.. {
286 info!(attempt, "waiting for RPCs to drop the federation object");
287 match Arc::try_unwrap(client) {
288 Ok(client) => {
289 client.shutdown().await;
290 break;
291 }
292 Err(client_val) => client = client_val,
293 }
294 fedimint_core::task::sleep(Duration::from_millis(100)).await;
295 }
296 Ok(())
297 }
298
299 fn handle_client_rpc(
300 self: Arc<Self>,
301 client_name: String,
302 module: String,
303 method: String,
304 payload: serde_json::Value,
305 ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
306 Box::pin(try_stream! {
307 let client = self
308 .get_client(&client_name)
309 .await
310 .with_context(|| format!("Client not found: {client_name}"))?;
311 match module.as_str() {
312 "" => {
313 let mut stream = client.handle_global_rpc(method, payload);
314 while let Some(item) = stream.next().await {
315 yield item?;
316 }
317 }
318 "ln" => {
319 let ln = client.get_first_module::<LightningClientModule>()?.inner();
320 let mut stream = ln.handle_rpc(method, payload).await;
321 while let Some(item) = stream.next().await {
322 yield item?;
323 }
324 }
325 "mint" => {
326 let mint = client.get_first_module::<MintClientModule>()?.inner();
327 let mut stream = mint.handle_rpc(method, payload).await;
328 while let Some(item) = stream.next().await {
329 yield item?;
330 }
331 }
332 "wallet" => {
333 let wallet = client
334 .get_first_module::<WalletClientModule>()?
335 .inner();
336 let mut stream = wallet.handle_rpc(method, payload).await;
337 while let Some(item) = stream.next().await {
338 yield item?;
339 }
340 }
341 _ => {
342 Err(anyhow::format_err!("module not found: {module}"))?;
343 },
344 };
345 })
346 }
347
348 fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
349 let invite_code = InviteCode::from_str(&invite_code)?;
350
351 Ok(json!({
352 "url": invite_code.url(),
353 "federation_id": invite_code.federation_id(),
354 }))
355 }
356
357 fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
358 let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
359 .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
360
361 let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
362 let amount_sat = amount_msat as f64 / 1000.0;
363
364 let expiry_seconds = invoice.expiry_time().as_secs();
365
366 let description = match invoice.description() {
368 Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
369 Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
370 };
371
372 Ok(json!({
373 "amount": amount_sat,
374 "expiry": expiry_seconds,
375 "memo": description,
376 }))
377 }
378
379 async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
380 let invite = InviteCode::from_str(&invite_code)?;
381 let federation_id = invite.federation_id();
382
383 let builder = Self::client_builder().await?;
384 let preview = builder.preview(&invite).await?;
385
386 let json_config = preview.config().to_json();
387 *self.preview_cache.lock().unwrap() = Some(preview);
389
390 Ok(json!({
391 "config": json_config,
392 "federation_id": federation_id.to_string(),
393 }))
394 }
395
396 fn handle_rpc_inner(
397 self: Arc<Self>,
398 request: RpcRequest,
399 ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
400 match request.kind {
401 RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
402 self.set_mnemonic(words).await?;
403 yield serde_json::json!({ "success": true });
404 })),
405 RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
406 let words = self.generate_mnemonic().await?;
407 yield serde_json::json!({ "mnemonic": words });
408 })),
409 RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
410 let words = self.get_mnemonic_words().await?;
411 yield serde_json::json!({ "mnemonic": words });
412 })),
413 RpcRequestKind::JoinFederation {
414 invite_code,
415 client_name,
416 force_recover,
417 } => Some(Box::pin(try_stream! {
418 self.handle_join_federation(invite_code, client_name, force_recover)
419 .await?;
420 yield serde_json::json!(null);
421 })),
422 RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
423 self.handle_open_client(client_name).await?;
424 yield serde_json::json!(null);
425 })),
426 RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
427 self.handle_close_client(client_name).await?;
428 yield serde_json::json!(null);
429 })),
430 RpcRequestKind::ClientRpc {
431 client_name,
432 module,
433 method,
434 payload,
435 } => Some(self.handle_client_rpc(client_name, module, method, payload)),
436 RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
437 let result = self.parse_invite_code(invite_code)?;
438 yield result;
439 })),
440 RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
441 let result = self.parse_bolt11_invoice(invoice)?;
442 yield result;
443 })),
444 RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
445 let result = self.preview_federation(invite_code).await?;
446 yield result;
447 })),
448 RpcRequestKind::CancelRpc { cancel_request_id } => {
449 if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
450 handle.abort();
451 }
452 None
453 }
454 }
455 }
456
457 pub fn handle_rpc(
458 self: Arc<Self>,
459 request: RpcRequest,
460 handler: impl RpcResponseHandler + 'static,
461 ) -> HandledRpc<'static> {
462 let request_id = request.request_id;
463
464 let Some(stream) = self.clone().handle_rpc_inner(request) else {
465 return HandledRpc { task: None };
466 };
467
468 let (abort_handle, abort_registration) = AbortHandle::new_pair();
469 self.add_rpc_handle(request_id, abort_handle);
470
471 let task = Box::pin(async move {
472 let mut stream = Abortable::new(stream, abort_registration);
473
474 while let Some(result) = stream.next().await {
475 let response = match result {
476 Ok(value) => RpcResponse {
477 request_id,
478 kind: RpcResponseKind::Data { data: value },
479 },
480 Err(e) => RpcResponse {
481 request_id,
482 kind: RpcResponseKind::Error {
483 error: e.to_string(),
484 },
485 },
486 };
487 handler.handle_response(response);
488 }
489
490 let _ = self.remove_rpc_handle(request_id);
492 handler.handle_response(RpcResponse {
493 request_id,
494 kind: if stream.is_aborted() {
495 RpcResponseKind::Aborted {}
496 } else {
497 RpcResponseKind::End {}
498 },
499 });
500 });
501
502 HandledRpc { task: Some(task) }
503 }
504
505 async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
509 let mnemonic = self.get_mnemonic_from_db().await?;
510
511 if let Some(mnemonic) = mnemonic {
512 let words = mnemonic.words().map(|w| w.to_string()).collect();
513 Ok(Some(words))
514 } else {
515 Ok(None)
516 }
517 }
518 async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
521 let all_words = words.join(" ");
522 let mnemonic =
523 Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
524
525 let mut dbtx = self.unified_database.begin_transaction().await;
526
527 if dbtx.get_value(&MnemonicKey).await.is_some() {
528 anyhow::bail!(
529 "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
530 );
531 }
532
533 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
534 .await;
535
536 dbtx.commit_tx().await;
537
538 Ok(())
539 }
540
541 async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
544 let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
545 let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
546
547 let mut dbtx = self.unified_database.begin_transaction().await;
548
549 if dbtx.get_value(&MnemonicKey).await.is_some() {
550 anyhow::bail!(
551 "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
552 );
553 }
554
555 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
556 .await;
557
558 dbtx.commit_tx().await;
559
560 Ok(words)
561 }
562
563 fn derive_federation_secret(
565 &self,
566 mnemonic: &Mnemonic,
567 federation_id: &FederationId,
568 ) -> DerivableSecret {
569 let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
570 let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
571 let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
572 let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
573 federation_wallet_root_secret.child_key(ChildId(0))
574 }
575
576 async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
578 let mut dbtx = self.unified_database.begin_transaction_nc().await;
579
580 if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
581 let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
582 Ok(Some(mnemonic))
583 } else {
584 Ok(None)
585 }
586 }
587}