1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, RootSecret};
12use fedimint_core::config::FederationId;
13use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::impl_db_record;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_derive_secret::{ChildId, DerivableSecret};
20use fedimint_ln_client::{LightningClientInit, LightningClientModule};
21use fedimint_meta_client::MetaClientInit;
22use fedimint_mint_client::{MintClientInit, MintClientModule};
23use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
24use futures::StreamExt;
25use futures::future::{AbortHandle, Abortable};
26use lightning_invoice::Bolt11InvoiceDescriptionRef;
27use rand::thread_rng;
28use serde::{Deserialize, Serialize};
29use serde_json::json;
30use tokio::sync::Mutex;
31use tracing::info;
32
33#[repr(u8)]
35#[derive(Clone, Copy, Debug)]
36pub enum DbKeyPrefix {
37 ClientDatabase = 0x00,
38 Mnemonic = 0x01,
39}
40
41#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
42pub struct MnemonicKey;
43
44impl_db_record!(
45 key = MnemonicKey,
46 value = Vec<u8>,
47 db_prefix = DbKeyPrefix::Mnemonic,
48);
49
50#[derive(Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52pub struct RpcRequest {
53 pub request_id: u64,
54 #[serde(flatten)]
55 pub kind: RpcRequestKind,
56}
57
58#[derive(Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum RpcRequestKind {
61 SetMnemonic {
62 words: Vec<String>,
63 },
64 GenerateMnemonic,
65 GetMnemonic,
66 JoinFederation {
68 invite_code: String,
69 client_name: String,
70 },
71 OpenClient {
72 client_name: String,
73 },
74 CloseClient {
75 client_name: String,
76 },
77 ClientRpc {
78 client_name: String,
79 module: String,
80 method: String,
81 payload: serde_json::Value,
82 },
83 CancelRpc {
84 cancel_request_id: u64,
85 },
86 ParseInviteCode {
87 invite_code: String,
88 },
89 ParseBolt11Invoice {
90 invoice: String,
91 },
92 PreviewFederation {
93 invite_code: String,
94 },
95}
96
97#[derive(Serialize, Deserialize, Clone, Debug)]
98pub struct RpcResponse {
99 pub request_id: u64,
100 #[serde(flatten)]
101 pub kind: RpcResponseKind,
102}
103
104#[derive(Serialize, Deserialize, Clone, Debug)]
105#[serde(tag = "type", rename_all = "snake_case")]
106pub enum RpcResponseKind {
107 Data { data: serde_json::Value },
108 Error { error: String },
109 Aborted {},
110 End {},
111}
112
113pub trait RpcResponseHandler: MaybeSend + MaybeSync {
114 fn handle_response(&self, response: RpcResponse);
115}
116
117pub struct RpcGlobalState {
118 clients: Mutex<HashMap<String, ClientHandleArc>>,
119 rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
120 unified_database: Database,
121}
122
123pub struct HandledRpc<'a> {
124 pub task: Option<BoxFuture<'a, ()>>,
125}
126
127impl RpcGlobalState {
128 pub fn new(unified_database: Database) -> Self {
129 Self {
130 clients: Mutex::new(HashMap::new()),
131 rpc_handles: std::sync::Mutex::new(HashMap::new()),
132 unified_database,
133 }
134 }
135
136 async fn add_client(&self, client_name: String, client: ClientHandleArc) {
137 let mut clients = self.clients.lock().await;
138 clients.insert(client_name, client);
139 }
140
141 async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
142 let clients = self.clients.lock().await;
143 clients.get(client_name).cloned()
144 }
145
146 fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
147 let mut handles = self.rpc_handles.lock().unwrap();
148 if handles.insert(request_id, handle).is_some() {
149 tracing::error!("RPC CLIENT ERROR: request id reuse detected");
150 }
151 }
152
153 fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
154 let mut handles = self.rpc_handles.lock().unwrap();
155 handles.remove(&request_id)
156 }
157
158 async fn client_builder(db: Database) -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
159 let mut builder = fedimint_client::Client::builder(db).await?;
160 builder.with_module(MintClientInit);
161 builder.with_module(LightningClientInit::default());
162 builder.with_module(WalletClientInit(None));
163 builder.with_module(MetaClientInit);
164 builder.with_primary_module_kind(fedimint_mint_client::KIND);
165 Ok(builder)
166 }
167
168 async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
170 assert_eq!(client_name.len(), 36);
171
172 let unified_db = &self.unified_database;
173 let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
174 client_prefix.extend_from_slice(client_name.as_bytes());
175 Ok(unified_db.with_prefix(client_prefix))
176 }
177
178 async fn handle_join_federation(
180 &self,
181 invite_code: String,
182 client_name: String,
183 ) -> anyhow::Result<()> {
184 let mnemonic = self
186 .get_mnemonic_from_db()
187 .await?
188 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
189
190 let client_db = self.client_db(client_name.clone()).await?;
191
192 let invite_code = InviteCode::from_str(&invite_code)?;
193 let federation_id = invite_code.federation_id();
194
195 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
197
198 let builder = Self::client_builder(client_db).await?;
199 let client = Arc::new(
200 builder
201 .preview(&invite_code)
202 .await?
203 .join(RootSecret::StandardDoubleDerive(federation_secret))
204 .await?,
205 );
206
207 self.add_client(client_name, client).await;
208 Ok(())
209 }
210
211 async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
212 let mnemonic = self
214 .get_mnemonic_from_db()
215 .await?
216 .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
217
218 let client_db = self.client_db(client_name.clone()).await?;
219
220 if !fedimint_client::Client::is_initialized(&client_db).await {
221 anyhow::bail!("client is not initialized for this database");
222 }
223
224 let client_config = fedimint_client::Client::get_config_from_db(&client_db)
226 .await
227 .context("Client config not found in database")?;
228
229 let federation_id = client_config.calculate_federation_id();
230
231 let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
233
234 let builder = Self::client_builder(client_db).await?;
235 let client = Arc::new(
236 builder
237 .open(RootSecret::StandardDoubleDerive(federation_secret))
238 .await?,
239 );
240
241 self.add_client(client_name, client).await;
242 Ok(())
243 }
244
245 async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
246 let mut clients = self.clients.lock().await;
247 let mut client = clients.remove(&client_name).context("client not found")?;
248
249 for attempt in 0.. {
251 info!(attempt, "waiting for RPCs to drop the federation object");
252 match Arc::try_unwrap(client) {
253 Ok(client) => {
254 client.shutdown().await;
255 break;
256 }
257 Err(client_val) => client = client_val,
258 }
259 fedimint_core::task::sleep(Duration::from_millis(100)).await;
260 }
261 Ok(())
262 }
263
264 fn handle_client_rpc(
265 self: Arc<Self>,
266 client_name: String,
267 module: String,
268 method: String,
269 payload: serde_json::Value,
270 ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
271 Box::pin(try_stream! {
272 let client = self
273 .get_client(&client_name)
274 .await
275 .with_context(|| format!("Client not found: {}", client_name))?;
276 match module.as_str() {
277 "" => {
278 let mut stream = client.handle_global_rpc(method, payload);
279 while let Some(item) = stream.next().await {
280 yield item?;
281 }
282 }
283 "ln" => {
284 let ln = client.get_first_module::<LightningClientModule>()?.inner();
285 let mut stream = ln.handle_rpc(method, payload).await;
286 while let Some(item) = stream.next().await {
287 yield item?;
288 }
289 }
290 "mint" => {
291 let mint = client.get_first_module::<MintClientModule>()?.inner();
292 let mut stream = mint.handle_rpc(method, payload).await;
293 while let Some(item) = stream.next().await {
294 yield item?;
295 }
296 }
297 "wallet" => {
298 let wallet = client
299 .get_first_module::<WalletClientModule>()?
300 .inner();
301 let mut stream = wallet.handle_rpc(method, payload).await;
302 while let Some(item) = stream.next().await {
303 yield item?;
304 }
305 }
306 _ => {
307 Err(anyhow::format_err!("module not found: {module}"))?;
308 },
309 };
310 })
311 }
312
313 fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
314 let invite_code = InviteCode::from_str(&invite_code)?;
315
316 Ok(json!({
317 "url": invite_code.url(),
318 "federation_id": invite_code.federation_id(),
319 }))
320 }
321
322 fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
323 let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
324 .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
325
326 let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
327 let amount_sat = amount_msat as f64 / 1000.0;
328
329 let expiry_seconds = invoice.expiry_time().as_secs();
330
331 let description = match invoice.description() {
333 Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
334 Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
335 };
336
337 Ok(json!({
338 "amount": amount_sat,
339 "expiry": expiry_seconds,
340 "memo": description,
341 }))
342 }
343
344 async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
345 let invite = InviteCode::from_str(&invite_code)?;
346 let client_config = fedimint_api_client::api::net::Connector::default()
347 .download_from_invite_code(&invite)
348 .await?;
349 let json_config = client_config.to_json();
350 let federation_id = client_config.calculate_federation_id();
351
352 Ok(json!({
353 "config": json_config,
354 "federation_id": federation_id.to_string(),
355 }))
356 }
357
358 fn handle_rpc_inner(
359 self: Arc<Self>,
360 request: RpcRequest,
361 ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
362 match request.kind {
363 RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
364 self.set_mnemonic(words).await?;
365 yield serde_json::json!({ "success": true });
366 })),
367 RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
368 let words = self.generate_mnemonic().await?;
369 yield serde_json::json!({ "mnemonic": words });
370 })),
371 RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
372 let words = self.get_mnemonic_words().await?;
373 yield serde_json::json!({ "mnemonic": words });
374 })),
375 RpcRequestKind::JoinFederation {
376 invite_code,
377 client_name,
378 } => Some(Box::pin(try_stream! {
379 self.handle_join_federation(invite_code, client_name)
380 .await?;
381 yield serde_json::json!(null);
382 })),
383 RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
384 self.handle_open_client(client_name).await?;
385 yield serde_json::json!(null);
386 })),
387 RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
388 self.handle_close_client(client_name).await?;
389 yield serde_json::json!(null);
390 })),
391 RpcRequestKind::ClientRpc {
392 client_name,
393 module,
394 method,
395 payload,
396 } => Some(self.handle_client_rpc(client_name, module, method, payload)),
397 RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
398 let result = self.parse_invite_code(invite_code)?;
399 yield result;
400 })),
401 RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
402 let result = self.parse_bolt11_invoice(invoice)?;
403 yield result;
404 })),
405 RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
406 let result = self.preview_federation(invite_code).await?;
407 yield result;
408 })),
409 RpcRequestKind::CancelRpc { cancel_request_id } => {
410 if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
411 handle.abort();
412 }
413 None
414 }
415 }
416 }
417
418 pub fn handle_rpc(
419 self: Arc<Self>,
420 request: RpcRequest,
421 handler: impl RpcResponseHandler + 'static,
422 ) -> HandledRpc<'static> {
423 let request_id = request.request_id;
424
425 let Some(stream) = self.clone().handle_rpc_inner(request) else {
426 return HandledRpc { task: None };
427 };
428
429 let (abort_handle, abort_registration) = AbortHandle::new_pair();
430 self.add_rpc_handle(request_id, abort_handle);
431
432 let task = Box::pin(async move {
433 let mut stream = Abortable::new(stream, abort_registration);
434
435 while let Some(result) = stream.next().await {
436 let response = match result {
437 Ok(value) => RpcResponse {
438 request_id,
439 kind: RpcResponseKind::Data { data: value },
440 },
441 Err(e) => RpcResponse {
442 request_id,
443 kind: RpcResponseKind::Error {
444 error: e.to_string(),
445 },
446 },
447 };
448 handler.handle_response(response);
449 }
450
451 let _ = self.remove_rpc_handle(request_id);
453 handler.handle_response(RpcResponse {
454 request_id,
455 kind: if stream.is_aborted() {
456 RpcResponseKind::Aborted {}
457 } else {
458 RpcResponseKind::End {}
459 },
460 });
461 });
462
463 HandledRpc { task: Some(task) }
464 }
465
466 async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
470 let mnemonic = self.get_mnemonic_from_db().await?;
471
472 if let Some(mnemonic) = mnemonic {
473 let words = mnemonic.words().map(|w| w.to_string()).collect();
474 Ok(Some(words))
475 } else {
476 Ok(None)
477 }
478 }
479 async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
482 let all_words = words.join(" ");
483 let mnemonic =
484 Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
485
486 let mut dbtx = self.unified_database.begin_transaction().await;
487
488 if dbtx.get_value(&MnemonicKey).await.is_some() {
489 anyhow::bail!(
490 "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
491 );
492 }
493
494 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
495 .await;
496
497 dbtx.commit_tx().await;
498
499 Ok(())
500 }
501
502 async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
505 let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
506 let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
507
508 let mut dbtx = self.unified_database.begin_transaction().await;
509
510 if dbtx.get_value(&MnemonicKey).await.is_some() {
511 anyhow::bail!(
512 "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
513 );
514 }
515
516 dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
517 .await;
518
519 dbtx.commit_tx().await;
520
521 Ok(words)
522 }
523
524 fn derive_federation_secret(
526 &self,
527 mnemonic: &Mnemonic,
528 federation_id: &FederationId,
529 ) -> DerivableSecret {
530 let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
531 let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
532 let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
533 let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
534 federation_wallet_root_secret.child_key(ChildId(0))
535 }
536
537 async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
539 let mut dbtx = self.unified_database.begin_transaction_nc().await;
540
541 if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
542 let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
543 Ok(Some(mnemonic))
544 } else {
545 Ok(None)
546 }
547 }
548}