1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, RootSecret};
12use fedimint_core::config::FederationId;
13use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::impl_db_record;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_derive_secret::{ChildId, DerivableSecret};
20use fedimint_ln_client::{LightningClientInit, LightningClientModule};
21use fedimint_meta_client::MetaClientInit;
22use fedimint_mint_client::{MintClientInit, MintClientModule};
23use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
24use futures::StreamExt;
25use futures::future::{AbortHandle, Abortable};
26use lightning_invoice::Bolt11InvoiceDescriptionRef;
27use rand::thread_rng;
28use serde::{Deserialize, Serialize};
29use serde_json::json;
30use tokio::sync::Mutex;
31use tracing::info;
32
33#[repr(u8)]
35#[derive(Clone, Copy, Debug)]
36pub enum DbKeyPrefix {
37 ClientDatabase = 0x00,
38 Mnemonic = 0x01,
39}
40
41#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
42pub struct MnemonicKey;
43
44impl_db_record!(
45 key = MnemonicKey,
46 value = Vec<u8>,
47 db_prefix = DbKeyPrefix::Mnemonic,
48);
49
50#[derive(Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52pub struct RpcRequest {
53 pub request_id: u64,
54 #[serde(flatten)]
55 pub kind: RpcRequestKind,
56}
57
58#[derive(Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum RpcRequestKind {
61 SetMnemonic {
62 words: Vec<String>,
63 },
64 GenerateMnemonic,
65 GetMnemonic,
66 JoinFederation {
68 invite_code: String,
69 force_recover: bool,
70 client_name: String,
71 },
72 OpenClient {
73 client_name: String,
74 },
75 CloseClient {
76 client_name: String,
77 },
78 ClientRpc {
79 client_name: String,
80 module: String,
81 method: String,
82 payload: serde_json::Value,
83 },
84 CancelRpc {
85 cancel_request_id: u64,
86 },
87 ParseInviteCode {
88 invite_code: String,
89 },
90 ParseBolt11Invoice {
91 invoice: String,
92 },
93 PreviewFederation {
94 invite_code: String,
95 },
96}
97
98#[derive(Serialize, Deserialize, Clone, Debug)]
99pub struct RpcResponse {
100 pub request_id: u64,
101 #[serde(flatten)]
102 pub kind: RpcResponseKind,
103}
104
105#[derive(Serialize, Deserialize, Clone, Debug)]
106#[serde(tag = "type", rename_all = "snake_case")]
107pub enum RpcResponseKind {
108 Data { data: serde_json::Value },
109 Error { error: String },
110 Aborted {},
111 End {},
112}
113
114pub trait RpcResponseHandler: MaybeSend + MaybeSync {
115 fn handle_response(&self, response: RpcResponse);
116}
117
118pub struct RpcGlobalState {
119 clients: Mutex<HashMap<String, ClientHandleArc>>,
120 rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
121 unified_database: Database,
122}
123
124pub struct HandledRpc<'a> {
125 pub task: Option<BoxFuture<'a, ()>>,
126}
127
128impl RpcGlobalState {
129 pub fn new(unified_database: Database) -> Self {
130 Self {
131 clients: Mutex::new(HashMap::new()),
132 rpc_handles: std::sync::Mutex::new(HashMap::new()),
133 unified_database,
134 }
135 }
136
137 async fn add_client(&self, client_name: String, client: ClientHandleArc) {
138 let mut clients = self.clients.lock().await;
139 clients.insert(client_name, client);
140 }
141
142 async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
143 let clients = self.clients.lock().await;
144 clients.get(client_name).cloned()
145 }
146
147 fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
148 let mut handles = self.rpc_handles.lock().unwrap();
149 if handles.insert(request_id, handle).is_some() {
150 tracing::error!("RPC CLIENT ERROR: request id reuse detected");
151 }
152 }
153
154 fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
155 let mut handles = self.rpc_handles.lock().unwrap();
156 handles.remove(&request_id)
157 }
158
159 async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
160 let mut builder = fedimint_client::Client::builder().await?;
161 builder.with_module(MintClientInit);
162 builder.with_module(LightningClientInit::default());
163 builder.with_module(WalletClientInit(None));
164 builder.with_module(MetaClientInit);
165 builder.with_primary_module_kind(fedimint_mint_client::KIND);
166 Ok(builder)
167 }
168
169 async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
171 assert_eq!(client_name.len(), 36);
172
173 let unified_db = &self.unified_database;
174 let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
175 client_prefix.extend_from_slice(client_name.as_bytes());
176 Ok(unified_db.with_prefix(client_prefix))
177 }
178
179 async fn handle_join_federation(
181 &self,
182 invite_code: String,
183 client_name: String,
184 force_recover: bool,
185 ) -> anyhow::Result<()> {
186 let mnemonic = self
188 .get_mnemonic_from_db()
189 .await?
190 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
191
192 let client_db = self.client_db(client_name.clone()).await?;
193
194 let invite_code = InviteCode::from_str(&invite_code)?;
195 let federation_id = invite_code.federation_id();
196
197 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
199
200 let builder = Self::client_builder().await?;
201 let preview = builder.preview(&invite_code).await?;
202
203 let backup = preview
205 .download_backup_from_federation(RootSecret::StandardDoubleDerive(
206 federation_secret.clone(),
207 ))
208 .await?;
209
210 let client = if force_recover || backup.is_some() {
211 Arc::new(
212 preview
213 .recover(
214 client_db,
215 RootSecret::StandardDoubleDerive(federation_secret),
216 backup,
217 )
218 .await?,
219 )
220 } else {
221 Arc::new(
222 preview
223 .join(
224 client_db,
225 RootSecret::StandardDoubleDerive(federation_secret),
226 )
227 .await?,
228 )
229 };
230
231 self.add_client(client_name, client).await;
232 Ok(())
233 }
234
235 async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
236 let mnemonic = self
238 .get_mnemonic_from_db()
239 .await?
240 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
241
242 let client_db = self.client_db(client_name.clone()).await?;
243
244 if !fedimint_client::Client::is_initialized(&client_db).await {
245 anyhow::bail!("client is not initialized for this database");
246 }
247
248 let client_config = fedimint_client::Client::get_config_from_db(&client_db)
250 .await
251 .context("Client config not found in database")?;
252
253 let federation_id = client_config.calculate_federation_id();
254
255 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
257
258 let builder = Self::client_builder().await?;
259 let client = Arc::new(
260 builder
261 .open(
262 client_db,
263 RootSecret::StandardDoubleDerive(federation_secret),
264 )
265 .await?,
266 );
267
268 self.add_client(client_name, client).await;
269 Ok(())
270 }
271
272 async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
273 let mut clients = self.clients.lock().await;
274 let mut client = clients.remove(&client_name).context("client not found")?;
275
276 for attempt in 0.. {
278 info!(attempt, "waiting for RPCs to drop the federation object");
279 match Arc::try_unwrap(client) {
280 Ok(client) => {
281 client.shutdown().await;
282 break;
283 }
284 Err(client_val) => client = client_val,
285 }
286 fedimint_core::task::sleep(Duration::from_millis(100)).await;
287 }
288 Ok(())
289 }
290
291 fn handle_client_rpc(
292 self: Arc<Self>,
293 client_name: String,
294 module: String,
295 method: String,
296 payload: serde_json::Value,
297 ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
298 Box::pin(try_stream! {
299 let client = self
300 .get_client(&client_name)
301 .await
302 .with_context(|| format!("Client not found: {client_name}"))?;
303 match module.as_str() {
304 "" => {
305 let mut stream = client.handle_global_rpc(method, payload);
306 while let Some(item) = stream.next().await {
307 yield item?;
308 }
309 }
310 "ln" => {
311 let ln = client.get_first_module::<LightningClientModule>()?.inner();
312 let mut stream = ln.handle_rpc(method, payload).await;
313 while let Some(item) = stream.next().await {
314 yield item?;
315 }
316 }
317 "mint" => {
318 let mint = client.get_first_module::<MintClientModule>()?.inner();
319 let mut stream = mint.handle_rpc(method, payload).await;
320 while let Some(item) = stream.next().await {
321 yield item?;
322 }
323 }
324 "wallet" => {
325 let wallet = client
326 .get_first_module::<WalletClientModule>()?
327 .inner();
328 let mut stream = wallet.handle_rpc(method, payload).await;
329 while let Some(item) = stream.next().await {
330 yield item?;
331 }
332 }
333 _ => {
334 Err(anyhow::format_err!("module not found: {module}"))?;
335 },
336 };
337 })
338 }
339
340 fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
341 let invite_code = InviteCode::from_str(&invite_code)?;
342
343 Ok(json!({
344 "url": invite_code.url(),
345 "federation_id": invite_code.federation_id(),
346 }))
347 }
348
349 fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
350 let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
351 .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
352
353 let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
354 let amount_sat = amount_msat as f64 / 1000.0;
355
356 let expiry_seconds = invoice.expiry_time().as_secs();
357
358 let description = match invoice.description() {
360 Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
361 Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
362 };
363
364 Ok(json!({
365 "amount": amount_sat,
366 "expiry": expiry_seconds,
367 "memo": description,
368 }))
369 }
370
371 async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
372 let invite = InviteCode::from_str(&invite_code)?;
373 let (client_config, _) = fedimint_api_client::api::net::Connector::default()
374 .download_from_invite_code(
375 &invite, false, false,
376 )
377 .await?;
378 let json_config = client_config.to_json();
379 let federation_id = client_config.calculate_federation_id();
380
381 Ok(json!({
382 "config": json_config,
383 "federation_id": federation_id.to_string(),
384 }))
385 }
386
387 fn handle_rpc_inner(
388 self: Arc<Self>,
389 request: RpcRequest,
390 ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
391 match request.kind {
392 RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
393 self.set_mnemonic(words).await?;
394 yield serde_json::json!({ "success": true });
395 })),
396 RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
397 let words = self.generate_mnemonic().await?;
398 yield serde_json::json!({ "mnemonic": words });
399 })),
400 RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
401 let words = self.get_mnemonic_words().await?;
402 yield serde_json::json!({ "mnemonic": words });
403 })),
404 RpcRequestKind::JoinFederation {
405 invite_code,
406 client_name,
407 force_recover,
408 } => Some(Box::pin(try_stream! {
409 self.handle_join_federation(invite_code, client_name, force_recover)
410 .await?;
411 yield serde_json::json!(null);
412 })),
413 RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
414 self.handle_open_client(client_name).await?;
415 yield serde_json::json!(null);
416 })),
417 RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
418 self.handle_close_client(client_name).await?;
419 yield serde_json::json!(null);
420 })),
421 RpcRequestKind::ClientRpc {
422 client_name,
423 module,
424 method,
425 payload,
426 } => Some(self.handle_client_rpc(client_name, module, method, payload)),
427 RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
428 let result = self.parse_invite_code(invite_code)?;
429 yield result;
430 })),
431 RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
432 let result = self.parse_bolt11_invoice(invoice)?;
433 yield result;
434 })),
435 RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
436 let result = self.preview_federation(invite_code).await?;
437 yield result;
438 })),
439 RpcRequestKind::CancelRpc { cancel_request_id } => {
440 if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
441 handle.abort();
442 }
443 None
444 }
445 }
446 }
447
448 pub fn handle_rpc(
449 self: Arc<Self>,
450 request: RpcRequest,
451 handler: impl RpcResponseHandler + 'static,
452 ) -> HandledRpc<'static> {
453 let request_id = request.request_id;
454
455 let Some(stream) = self.clone().handle_rpc_inner(request) else {
456 return HandledRpc { task: None };
457 };
458
459 let (abort_handle, abort_registration) = AbortHandle::new_pair();
460 self.add_rpc_handle(request_id, abort_handle);
461
462 let task = Box::pin(async move {
463 let mut stream = Abortable::new(stream, abort_registration);
464
465 while let Some(result) = stream.next().await {
466 let response = match result {
467 Ok(value) => RpcResponse {
468 request_id,
469 kind: RpcResponseKind::Data { data: value },
470 },
471 Err(e) => RpcResponse {
472 request_id,
473 kind: RpcResponseKind::Error {
474 error: e.to_string(),
475 },
476 },
477 };
478 handler.handle_response(response);
479 }
480
481 let _ = self.remove_rpc_handle(request_id);
483 handler.handle_response(RpcResponse {
484 request_id,
485 kind: if stream.is_aborted() {
486 RpcResponseKind::Aborted {}
487 } else {
488 RpcResponseKind::End {}
489 },
490 });
491 });
492
493 HandledRpc { task: Some(task) }
494 }
495
496 async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
500 let mnemonic = self.get_mnemonic_from_db().await?;
501
502 if let Some(mnemonic) = mnemonic {
503 let words = mnemonic.words().map(|w| w.to_string()).collect();
504 Ok(Some(words))
505 } else {
506 Ok(None)
507 }
508 }
509 async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
512 let all_words = words.join(" ");
513 let mnemonic =
514 Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
515
516 let mut dbtx = self.unified_database.begin_transaction().await;
517
518 if dbtx.get_value(&MnemonicKey).await.is_some() {
519 anyhow::bail!(
520 "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
521 );
522 }
523
524 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
525 .await;
526
527 dbtx.commit_tx().await;
528
529 Ok(())
530 }
531
532 async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
535 let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
536 let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
537
538 let mut dbtx = self.unified_database.begin_transaction().await;
539
540 if dbtx.get_value(&MnemonicKey).await.is_some() {
541 anyhow::bail!(
542 "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
543 );
544 }
545
546 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
547 .await;
548
549 dbtx.commit_tx().await;
550
551 Ok(words)
552 }
553
554 fn derive_federation_secret(
556 &self,
557 mnemonic: &Mnemonic,
558 federation_id: &FederationId,
559 ) -> DerivableSecret {
560 let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
561 let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
562 let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
563 let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
564 federation_wallet_root_secret.child_key(ChildId(0))
565 }
566
567 async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
569 let mut dbtx = self.unified_database.begin_transaction_nc().await;
570
571 if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
572 let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
573 Ok(Some(mnemonic))
574 } else {
575 Ok(None)
576 }
577 }
578}