fedimint_client_rpc/
lib.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, ClientPreview, RootSecret};
12use fedimint_core::config::FederationId;
13use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::impl_db_record;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_derive_secret::{ChildId, DerivableSecret};
20use fedimint_ln_client::{LightningClientInit, LightningClientModule};
21use fedimint_meta_client::MetaClientInit;
22use fedimint_mint_client::{MintClientInit, MintClientModule};
23use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
24use futures::StreamExt;
25use futures::future::{AbortHandle, Abortable};
26use lightning_invoice::Bolt11InvoiceDescriptionRef;
27use rand::thread_rng;
28use serde::{Deserialize, Serialize};
29use serde_json::json;
30use tokio::sync::Mutex;
31use tracing::info;
32
33// Key prefixes for the unified database
34#[repr(u8)]
35#[derive(Clone, Copy, Debug)]
36pub enum DbKeyPrefix {
37    ClientDatabase = 0x00,
38    Mnemonic = 0x01,
39}
40
41#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
42pub struct MnemonicKey;
43
44impl_db_record!(
45    key = MnemonicKey,
46    value = Vec<u8>,
47    db_prefix = DbKeyPrefix::Mnemonic,
48);
49
50#[derive(Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52pub struct RpcRequest {
53    pub request_id: u64,
54    #[serde(flatten)]
55    pub kind: RpcRequestKind,
56}
57
58#[derive(Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum RpcRequestKind {
61    SetMnemonic {
62        words: Vec<String>,
63    },
64    GenerateMnemonic,
65    GetMnemonic,
66    /// Join federation (requires mnemonic to be set first)
67    JoinFederation {
68        invite_code: String,
69        force_recover: bool,
70        client_name: String,
71    },
72    OpenClient {
73        client_name: String,
74    },
75    CloseClient {
76        client_name: String,
77    },
78    ClientRpc {
79        client_name: String,
80        module: String,
81        method: String,
82        payload: serde_json::Value,
83    },
84    CancelRpc {
85        cancel_request_id: u64,
86    },
87    ParseInviteCode {
88        invite_code: String,
89    },
90    ParseBolt11Invoice {
91        invoice: String,
92    },
93    PreviewFederation {
94        invite_code: String,
95    },
96}
97
98#[derive(Serialize, Deserialize, Clone, Debug)]
99pub struct RpcResponse {
100    pub request_id: u64,
101    #[serde(flatten)]
102    pub kind: RpcResponseKind,
103}
104
105#[derive(Serialize, Deserialize, Clone, Debug)]
106#[serde(tag = "type", rename_all = "snake_case")]
107pub enum RpcResponseKind {
108    Data { data: serde_json::Value },
109    Error { error: String },
110    Aborted {},
111    End {},
112}
113
114pub trait RpcResponseHandler: MaybeSend + MaybeSync {
115    fn handle_response(&self, response: RpcResponse);
116}
117
118pub struct RpcGlobalState {
119    clients: Mutex<HashMap<String, ClientHandleArc>>,
120    rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
121    unified_database: Database,
122    preview_cache: std::sync::Mutex<Option<ClientPreview>>,
123}
124
125pub struct HandledRpc<'a> {
126    pub task: Option<BoxFuture<'a, ()>>,
127}
128
129impl RpcGlobalState {
130    pub fn new(unified_database: Database) -> Self {
131        Self {
132            clients: Mutex::new(HashMap::new()),
133            rpc_handles: std::sync::Mutex::new(HashMap::new()),
134            unified_database,
135            preview_cache: std::sync::Mutex::new(None),
136        }
137    }
138
139    async fn add_client(&self, client_name: String, client: ClientHandleArc) {
140        let mut clients = self.clients.lock().await;
141        clients.insert(client_name, client);
142    }
143
144    async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
145        let clients = self.clients.lock().await;
146        clients.get(client_name).cloned()
147    }
148
149    fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
150        let mut handles = self.rpc_handles.lock().unwrap();
151        if handles.insert(request_id, handle).is_some() {
152            tracing::error!("RPC CLIENT ERROR: request id reuse detected");
153        }
154    }
155
156    fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
157        let mut handles = self.rpc_handles.lock().unwrap();
158        handles.remove(&request_id)
159    }
160
161    async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
162        let mut builder = fedimint_client::Client::builder().await?;
163        builder.with_module(MintClientInit);
164        builder.with_module(LightningClientInit::default());
165        builder.with_module(WalletClientInit(None));
166        builder.with_module(MetaClientInit);
167        Ok(builder)
168    }
169
170    /// Get client-specific database with proper prefix
171    async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
172        assert_eq!(client_name.len(), 36);
173
174        let unified_db = &self.unified_database;
175        let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
176        client_prefix.extend_from_slice(client_name.as_bytes());
177        Ok(unified_db.with_prefix(client_prefix))
178    }
179
180    /// Handle joining federation using unified database
181    async fn handle_join_federation(
182        &self,
183        invite_code: String,
184        client_name: String,
185        force_recover: bool,
186    ) -> anyhow::Result<()> {
187        // Check if wallet mnemonic is set
188        let mnemonic = self
189            .get_mnemonic_from_db()
190            .await?
191            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
192
193        let client_db = self.client_db(client_name.clone()).await?;
194
195        let invite_code = InviteCode::from_str(&invite_code)?;
196        let federation_id = invite_code.federation_id();
197
198        // Derive federation-specific secret from wallet mnemonic
199        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
200
201        // Try to consume cached preview, otherwise create new one
202        let cached_preview = self.preview_cache.lock().unwrap().take();
203        let preview = match cached_preview {
204            Some(preview) if preview.config().calculate_federation_id() == federation_id => preview,
205            _ => {
206                let builder = Self::client_builder().await?;
207                builder.preview(&invite_code).await?
208            }
209        };
210
211        // Check if backup exists
212        let backup = preview
213            .download_backup_from_federation(RootSecret::StandardDoubleDerive(
214                federation_secret.clone(),
215            ))
216            .await?;
217
218        let client = if force_recover || backup.is_some() {
219            Arc::new(
220                preview
221                    .recover(
222                        client_db,
223                        RootSecret::StandardDoubleDerive(federation_secret),
224                        backup,
225                    )
226                    .await?,
227            )
228        } else {
229            Arc::new(
230                preview
231                    .join(
232                        client_db,
233                        RootSecret::StandardDoubleDerive(federation_secret),
234                    )
235                    .await?,
236            )
237        };
238
239        self.add_client(client_name, client).await;
240        Ok(())
241    }
242
243    async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
244        // Check if wallet mnemonic is set
245        let mnemonic = self
246            .get_mnemonic_from_db()
247            .await?
248            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
249
250        let client_db = self.client_db(client_name.clone()).await?;
251
252        if !fedimint_client::Client::is_initialized(&client_db).await {
253            anyhow::bail!("client is not initialized for this database");
254        }
255
256        // Get the client config to retrieve the federation ID
257        let client_config = fedimint_client::Client::get_config_from_db(&client_db)
258            .await
259            .context("Client config not found in database")?;
260
261        let federation_id = client_config.calculate_federation_id();
262
263        // Derive federation-specific secret from wallet mnemonic
264        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
265
266        let builder = Self::client_builder().await?;
267        let client = Arc::new(
268            builder
269                .open(
270                    client_db,
271                    RootSecret::StandardDoubleDerive(federation_secret),
272                )
273                .await?,
274        );
275
276        self.add_client(client_name, client).await;
277        Ok(())
278    }
279
280    async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
281        let mut clients = self.clients.lock().await;
282        let mut client = clients.remove(&client_name).context("client not found")?;
283
284        // RPC calls might have cloned the client Arc before we remove the client.
285        for attempt in 0.. {
286            info!(attempt, "waiting for RPCs to drop the federation object");
287            match Arc::try_unwrap(client) {
288                Ok(client) => {
289                    client.shutdown().await;
290                    break;
291                }
292                Err(client_val) => client = client_val,
293            }
294            fedimint_core::task::sleep(Duration::from_millis(100)).await;
295        }
296        Ok(())
297    }
298
299    fn handle_client_rpc(
300        self: Arc<Self>,
301        client_name: String,
302        module: String,
303        method: String,
304        payload: serde_json::Value,
305    ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
306        Box::pin(try_stream! {
307            let client = self
308                .get_client(&client_name)
309                .await
310                .with_context(|| format!("Client not found: {client_name}"))?;
311            match module.as_str() {
312                "" => {
313                    let mut stream = client.handle_global_rpc(method, payload);
314                    while let Some(item) = stream.next().await {
315                        yield item?;
316                    }
317                }
318                "ln" => {
319                    let ln = client.get_first_module::<LightningClientModule>()?.inner();
320                    let mut stream = ln.handle_rpc(method, payload).await;
321                    while let Some(item) = stream.next().await {
322                        yield item?;
323                    }
324                }
325                "mint" => {
326                    let mint = client.get_first_module::<MintClientModule>()?.inner();
327                    let mut stream = mint.handle_rpc(method, payload).await;
328                    while let Some(item) = stream.next().await {
329                        yield item?;
330                    }
331                }
332                "wallet" => {
333                    let wallet = client
334                        .get_first_module::<WalletClientModule>()?
335                        .inner();
336                    let mut stream = wallet.handle_rpc(method, payload).await;
337                    while let Some(item) = stream.next().await {
338                        yield item?;
339                    }
340                }
341                _ => {
342                    Err(anyhow::format_err!("module not found: {module}"))?;
343                },
344            };
345        })
346    }
347
348    fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
349        let invite_code = InviteCode::from_str(&invite_code)?;
350
351        Ok(json!({
352            "url": invite_code.url(),
353            "federation_id": invite_code.federation_id(),
354        }))
355    }
356
357    fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
358        let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
359            .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
360
361        let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
362        let amount_sat = amount_msat as f64 / 1000.0;
363
364        let expiry_seconds = invoice.expiry_time().as_secs();
365
366        // memo
367        let description = match invoice.description() {
368            Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
369            Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
370        };
371
372        Ok(json!({
373            "amount": amount_sat,
374            "expiry": expiry_seconds,
375            "memo": description,
376        }))
377    }
378
379    async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
380        let invite = InviteCode::from_str(&invite_code)?;
381        let federation_id = invite.federation_id();
382
383        let builder = Self::client_builder().await?;
384        let preview = builder.preview(&invite).await?;
385
386        let json_config = preview.config().to_json();
387        // Store in cache
388        *self.preview_cache.lock().unwrap() = Some(preview);
389
390        Ok(json!({
391            "config": json_config,
392            "federation_id": federation_id.to_string(),
393        }))
394    }
395
396    fn handle_rpc_inner(
397        self: Arc<Self>,
398        request: RpcRequest,
399    ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
400        match request.kind {
401            RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
402                self.set_mnemonic(words).await?;
403                yield serde_json::json!({ "success": true });
404            })),
405            RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
406                let words = self.generate_mnemonic().await?;
407                yield serde_json::json!({ "mnemonic": words });
408            })),
409            RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
410                let words = self.get_mnemonic_words().await?;
411                yield serde_json::json!({ "mnemonic": words });
412            })),
413            RpcRequestKind::JoinFederation {
414                invite_code,
415                client_name,
416                force_recover,
417            } => Some(Box::pin(try_stream! {
418                self.handle_join_federation(invite_code, client_name, force_recover)
419                    .await?;
420                yield serde_json::json!(null);
421            })),
422            RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
423                self.handle_open_client(client_name).await?;
424                yield serde_json::json!(null);
425            })),
426            RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
427                self.handle_close_client(client_name).await?;
428                yield serde_json::json!(null);
429            })),
430            RpcRequestKind::ClientRpc {
431                client_name,
432                module,
433                method,
434                payload,
435            } => Some(self.handle_client_rpc(client_name, module, method, payload)),
436            RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
437                let result = self.parse_invite_code(invite_code)?;
438                yield result;
439            })),
440            RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
441                let result = self.parse_bolt11_invoice(invoice)?;
442                yield result;
443            })),
444            RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
445                let result = self.preview_federation(invite_code).await?;
446                yield result;
447            })),
448            RpcRequestKind::CancelRpc { cancel_request_id } => {
449                if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
450                    handle.abort();
451                }
452                None
453            }
454        }
455    }
456
457    pub fn handle_rpc(
458        self: Arc<Self>,
459        request: RpcRequest,
460        handler: impl RpcResponseHandler + 'static,
461    ) -> HandledRpc<'static> {
462        let request_id = request.request_id;
463
464        let Some(stream) = self.clone().handle_rpc_inner(request) else {
465            return HandledRpc { task: None };
466        };
467
468        let (abort_handle, abort_registration) = AbortHandle::new_pair();
469        self.add_rpc_handle(request_id, abort_handle);
470
471        let task = Box::pin(async move {
472            let mut stream = Abortable::new(stream, abort_registration);
473
474            while let Some(result) = stream.next().await {
475                let response = match result {
476                    Ok(value) => RpcResponse {
477                        request_id,
478                        kind: RpcResponseKind::Data { data: value },
479                    },
480                    Err(e) => RpcResponse {
481                        request_id,
482                        kind: RpcResponseKind::Error {
483                            error: e.to_string(),
484                        },
485                    },
486                };
487                handler.handle_response(response);
488            }
489
490            // Clean up abort handle and send end message
491            let _ = self.remove_rpc_handle(request_id);
492            handler.handle_response(RpcResponse {
493                request_id,
494                kind: if stream.is_aborted() {
495                    RpcResponseKind::Aborted {}
496                } else {
497                    RpcResponseKind::End {}
498                },
499            });
500        });
501
502        HandledRpc { task: Some(task) }
503    }
504
505    /// Retrieve the wallet-level mnemonic words.
506    /// Returns the mnemonic as a vector of words, or None if no mnemonic is
507    /// set.
508    async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
509        let mnemonic = self.get_mnemonic_from_db().await?;
510
511        if let Some(mnemonic) = mnemonic {
512            let words = mnemonic.words().map(|w| w.to_string()).collect();
513            Ok(Some(words))
514        } else {
515            Ok(None)
516        }
517    }
518    /// Set a mnemonic from user-provided words
519    /// Returns an error if a mnemonic is already set
520    async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
521        let all_words = words.join(" ");
522        let mnemonic =
523            Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
524
525        let mut dbtx = self.unified_database.begin_transaction().await;
526
527        if dbtx.get_value(&MnemonicKey).await.is_some() {
528            anyhow::bail!(
529                "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
530            );
531        }
532
533        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
534            .await;
535
536        dbtx.commit_tx().await;
537
538        Ok(())
539    }
540
541    /// Generate a new random mnemonic and set it
542    /// Returns an error if a mnemonic is already set
543    async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
544        let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
545        let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
546
547        let mut dbtx = self.unified_database.begin_transaction().await;
548
549        if dbtx.get_value(&MnemonicKey).await.is_some() {
550            anyhow::bail!(
551                "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
552            );
553        }
554
555        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
556            .await;
557
558        dbtx.commit_tx().await;
559
560        Ok(words)
561    }
562
563    /// Derive federation-specific secret from wallet mnemonic
564    fn derive_federation_secret(
565        &self,
566        mnemonic: &Mnemonic,
567        federation_id: &FederationId,
568    ) -> DerivableSecret {
569        let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
570        let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
571        let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
572        let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
573        federation_wallet_root_secret.child_key(ChildId(0))
574    }
575
576    /// Fetch mnemonic from database
577    async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
578        let mut dbtx = self.unified_database.begin_transaction_nc().await;
579
580        if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
581            let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
582            Ok(Some(mnemonic))
583        } else {
584            Ok(None)
585        }
586    }
587}