fedimint_client_rpc/
lib.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, RootSecret};
12use fedimint_core::config::FederationId;
13use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::impl_db_record;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_derive_secret::{ChildId, DerivableSecret};
20use fedimint_ln_client::{LightningClientInit, LightningClientModule};
21use fedimint_meta_client::MetaClientInit;
22use fedimint_mint_client::{MintClientInit, MintClientModule};
23use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
24use futures::StreamExt;
25use futures::future::{AbortHandle, Abortable};
26use lightning_invoice::Bolt11InvoiceDescriptionRef;
27use rand::thread_rng;
28use serde::{Deserialize, Serialize};
29use serde_json::json;
30use tokio::sync::Mutex;
31use tracing::info;
32
33// Key prefixes for the unified database
34#[repr(u8)]
35#[derive(Clone, Copy, Debug)]
36pub enum DbKeyPrefix {
37    ClientDatabase = 0x00,
38    Mnemonic = 0x01,
39}
40
41#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
42pub struct MnemonicKey;
43
44impl_db_record!(
45    key = MnemonicKey,
46    value = Vec<u8>,
47    db_prefix = DbKeyPrefix::Mnemonic,
48);
49
50#[derive(Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52pub struct RpcRequest {
53    pub request_id: u64,
54    #[serde(flatten)]
55    pub kind: RpcRequestKind,
56}
57
58#[derive(Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum RpcRequestKind {
61    SetMnemonic {
62        words: Vec<String>,
63    },
64    GenerateMnemonic,
65    GetMnemonic,
66    /// Join federation (requires mnemonic to be set first)
67    JoinFederation {
68        invite_code: String,
69        force_recover: bool,
70        client_name: String,
71    },
72    OpenClient {
73        client_name: String,
74    },
75    CloseClient {
76        client_name: String,
77    },
78    ClientRpc {
79        client_name: String,
80        module: String,
81        method: String,
82        payload: serde_json::Value,
83    },
84    CancelRpc {
85        cancel_request_id: u64,
86    },
87    ParseInviteCode {
88        invite_code: String,
89    },
90    ParseBolt11Invoice {
91        invoice: String,
92    },
93    PreviewFederation {
94        invite_code: String,
95    },
96}
97
98#[derive(Serialize, Deserialize, Clone, Debug)]
99pub struct RpcResponse {
100    pub request_id: u64,
101    #[serde(flatten)]
102    pub kind: RpcResponseKind,
103}
104
105#[derive(Serialize, Deserialize, Clone, Debug)]
106#[serde(tag = "type", rename_all = "snake_case")]
107pub enum RpcResponseKind {
108    Data { data: serde_json::Value },
109    Error { error: String },
110    Aborted {},
111    End {},
112}
113
114pub trait RpcResponseHandler: MaybeSend + MaybeSync {
115    fn handle_response(&self, response: RpcResponse);
116}
117
118pub struct RpcGlobalState {
119    clients: Mutex<HashMap<String, ClientHandleArc>>,
120    rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
121    unified_database: Database,
122}
123
124pub struct HandledRpc<'a> {
125    pub task: Option<BoxFuture<'a, ()>>,
126}
127
128impl RpcGlobalState {
129    pub fn new(unified_database: Database) -> Self {
130        Self {
131            clients: Mutex::new(HashMap::new()),
132            rpc_handles: std::sync::Mutex::new(HashMap::new()),
133            unified_database,
134        }
135    }
136
137    async fn add_client(&self, client_name: String, client: ClientHandleArc) {
138        let mut clients = self.clients.lock().await;
139        clients.insert(client_name, client);
140    }
141
142    async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
143        let clients = self.clients.lock().await;
144        clients.get(client_name).cloned()
145    }
146
147    fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
148        let mut handles = self.rpc_handles.lock().unwrap();
149        if handles.insert(request_id, handle).is_some() {
150            tracing::error!("RPC CLIENT ERROR: request id reuse detected");
151        }
152    }
153
154    fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
155        let mut handles = self.rpc_handles.lock().unwrap();
156        handles.remove(&request_id)
157    }
158
159    async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
160        let mut builder = fedimint_client::Client::builder().await?;
161        builder.with_module(MintClientInit);
162        builder.with_module(LightningClientInit::default());
163        builder.with_module(WalletClientInit(None));
164        builder.with_module(MetaClientInit);
165        builder.with_primary_module_kind(fedimint_mint_client::KIND);
166        Ok(builder)
167    }
168
169    /// Get client-specific database with proper prefix
170    async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
171        assert_eq!(client_name.len(), 36);
172
173        let unified_db = &self.unified_database;
174        let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
175        client_prefix.extend_from_slice(client_name.as_bytes());
176        Ok(unified_db.with_prefix(client_prefix))
177    }
178
179    /// Handle joining federation using unified database
180    async fn handle_join_federation(
181        &self,
182        invite_code: String,
183        client_name: String,
184        force_recover: bool,
185    ) -> anyhow::Result<()> {
186        // Check if wallet mnemonic is set
187        let mnemonic = self
188            .get_mnemonic_from_db()
189            .await?
190            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
191
192        let client_db = self.client_db(client_name.clone()).await?;
193
194        let invite_code = InviteCode::from_str(&invite_code)?;
195        let federation_id = invite_code.federation_id();
196
197        // Derive federation-specific secret from wallet mnemonic
198        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
199
200        let builder = Self::client_builder().await?;
201        let preview = builder.preview(&invite_code).await?;
202
203        // Check if backup exists
204        let backup = preview
205            .download_backup_from_federation(RootSecret::StandardDoubleDerive(
206                federation_secret.clone(),
207            ))
208            .await?;
209
210        let client = if force_recover || backup.is_some() {
211            Arc::new(
212                preview
213                    .recover(
214                        client_db,
215                        RootSecret::StandardDoubleDerive(federation_secret),
216                        backup,
217                    )
218                    .await?,
219            )
220        } else {
221            Arc::new(
222                preview
223                    .join(
224                        client_db,
225                        RootSecret::StandardDoubleDerive(federation_secret),
226                    )
227                    .await?,
228            )
229        };
230
231        self.add_client(client_name, client).await;
232        Ok(())
233    }
234
235    async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
236        // Check if wallet mnemonic is set
237        let mnemonic = self
238            .get_mnemonic_from_db()
239            .await?
240            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
241
242        let client_db = self.client_db(client_name.clone()).await?;
243
244        if !fedimint_client::Client::is_initialized(&client_db).await {
245            anyhow::bail!("client is not initialized for this database");
246        }
247
248        // Get the client config to retrieve the federation ID
249        let client_config = fedimint_client::Client::get_config_from_db(&client_db)
250            .await
251            .context("Client config not found in database")?;
252
253        let federation_id = client_config.calculate_federation_id();
254
255        // Derive federation-specific secret from wallet mnemonic
256        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
257
258        let builder = Self::client_builder().await?;
259        let client = Arc::new(
260            builder
261                .open(
262                    client_db,
263                    RootSecret::StandardDoubleDerive(federation_secret),
264                )
265                .await?,
266        );
267
268        self.add_client(client_name, client).await;
269        Ok(())
270    }
271
272    async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
273        let mut clients = self.clients.lock().await;
274        let mut client = clients.remove(&client_name).context("client not found")?;
275
276        // RPC calls might have cloned the client Arc before we remove the client.
277        for attempt in 0.. {
278            info!(attempt, "waiting for RPCs to drop the federation object");
279            match Arc::try_unwrap(client) {
280                Ok(client) => {
281                    client.shutdown().await;
282                    break;
283                }
284                Err(client_val) => client = client_val,
285            }
286            fedimint_core::task::sleep(Duration::from_millis(100)).await;
287        }
288        Ok(())
289    }
290
291    fn handle_client_rpc(
292        self: Arc<Self>,
293        client_name: String,
294        module: String,
295        method: String,
296        payload: serde_json::Value,
297    ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
298        Box::pin(try_stream! {
299            let client = self
300                .get_client(&client_name)
301                .await
302                .with_context(|| format!("Client not found: {client_name}"))?;
303            match module.as_str() {
304                "" => {
305                    let mut stream = client.handle_global_rpc(method, payload);
306                    while let Some(item) = stream.next().await {
307                        yield item?;
308                    }
309                }
310                "ln" => {
311                    let ln = client.get_first_module::<LightningClientModule>()?.inner();
312                    let mut stream = ln.handle_rpc(method, payload).await;
313                    while let Some(item) = stream.next().await {
314                        yield item?;
315                    }
316                }
317                "mint" => {
318                    let mint = client.get_first_module::<MintClientModule>()?.inner();
319                    let mut stream = mint.handle_rpc(method, payload).await;
320                    while let Some(item) = stream.next().await {
321                        yield item?;
322                    }
323                }
324                "wallet" => {
325                    let wallet = client
326                        .get_first_module::<WalletClientModule>()?
327                        .inner();
328                    let mut stream = wallet.handle_rpc(method, payload).await;
329                    while let Some(item) = stream.next().await {
330                        yield item?;
331                    }
332                }
333                _ => {
334                    Err(anyhow::format_err!("module not found: {module}"))?;
335                },
336            };
337        })
338    }
339
340    fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
341        let invite_code = InviteCode::from_str(&invite_code)?;
342
343        Ok(json!({
344            "url": invite_code.url(),
345            "federation_id": invite_code.federation_id(),
346        }))
347    }
348
349    fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
350        let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
351            .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
352
353        let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
354        let amount_sat = amount_msat as f64 / 1000.0;
355
356        let expiry_seconds = invoice.expiry_time().as_secs();
357
358        // memo
359        let description = match invoice.description() {
360            Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
361            Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
362        };
363
364        Ok(json!({
365            "amount": amount_sat,
366            "expiry": expiry_seconds,
367            "memo": description,
368        }))
369    }
370
371    async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
372        let invite = InviteCode::from_str(&invite_code)?;
373        let (client_config, _) = fedimint_api_client::api::net::Connector::default()
374            .download_from_invite_code(
375                &invite, /* TODO: how should rpc clients control this? */ false, false,
376            )
377            .await?;
378        let json_config = client_config.to_json();
379        let federation_id = client_config.calculate_federation_id();
380
381        Ok(json!({
382            "config": json_config,
383            "federation_id": federation_id.to_string(),
384        }))
385    }
386
387    fn handle_rpc_inner(
388        self: Arc<Self>,
389        request: RpcRequest,
390    ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
391        match request.kind {
392            RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
393                self.set_mnemonic(words).await?;
394                yield serde_json::json!({ "success": true });
395            })),
396            RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
397                let words = self.generate_mnemonic().await?;
398                yield serde_json::json!({ "mnemonic": words });
399            })),
400            RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
401                let words = self.get_mnemonic_words().await?;
402                yield serde_json::json!({ "mnemonic": words });
403            })),
404            RpcRequestKind::JoinFederation {
405                invite_code,
406                client_name,
407                force_recover,
408            } => Some(Box::pin(try_stream! {
409                self.handle_join_federation(invite_code, client_name, force_recover)
410                    .await?;
411                yield serde_json::json!(null);
412            })),
413            RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
414                self.handle_open_client(client_name).await?;
415                yield serde_json::json!(null);
416            })),
417            RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
418                self.handle_close_client(client_name).await?;
419                yield serde_json::json!(null);
420            })),
421            RpcRequestKind::ClientRpc {
422                client_name,
423                module,
424                method,
425                payload,
426            } => Some(self.handle_client_rpc(client_name, module, method, payload)),
427            RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
428                let result = self.parse_invite_code(invite_code)?;
429                yield result;
430            })),
431            RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
432                let result = self.parse_bolt11_invoice(invoice)?;
433                yield result;
434            })),
435            RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
436                let result = self.preview_federation(invite_code).await?;
437                yield result;
438            })),
439            RpcRequestKind::CancelRpc { cancel_request_id } => {
440                if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
441                    handle.abort();
442                }
443                None
444            }
445        }
446    }
447
448    pub fn handle_rpc(
449        self: Arc<Self>,
450        request: RpcRequest,
451        handler: impl RpcResponseHandler + 'static,
452    ) -> HandledRpc<'static> {
453        let request_id = request.request_id;
454
455        let Some(stream) = self.clone().handle_rpc_inner(request) else {
456            return HandledRpc { task: None };
457        };
458
459        let (abort_handle, abort_registration) = AbortHandle::new_pair();
460        self.add_rpc_handle(request_id, abort_handle);
461
462        let task = Box::pin(async move {
463            let mut stream = Abortable::new(stream, abort_registration);
464
465            while let Some(result) = stream.next().await {
466                let response = match result {
467                    Ok(value) => RpcResponse {
468                        request_id,
469                        kind: RpcResponseKind::Data { data: value },
470                    },
471                    Err(e) => RpcResponse {
472                        request_id,
473                        kind: RpcResponseKind::Error {
474                            error: e.to_string(),
475                        },
476                    },
477                };
478                handler.handle_response(response);
479            }
480
481            // Clean up abort handle and send end message
482            let _ = self.remove_rpc_handle(request_id);
483            handler.handle_response(RpcResponse {
484                request_id,
485                kind: if stream.is_aborted() {
486                    RpcResponseKind::Aborted {}
487                } else {
488                    RpcResponseKind::End {}
489                },
490            });
491        });
492
493        HandledRpc { task: Some(task) }
494    }
495
496    /// Retrieve the wallet-level mnemonic words.
497    /// Returns the mnemonic as a vector of words, or None if no mnemonic is
498    /// set.
499    async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
500        let mnemonic = self.get_mnemonic_from_db().await?;
501
502        if let Some(mnemonic) = mnemonic {
503            let words = mnemonic.words().map(|w| w.to_string()).collect();
504            Ok(Some(words))
505        } else {
506            Ok(None)
507        }
508    }
509    /// Set a mnemonic from user-provided words
510    /// Returns an error if a mnemonic is already set
511    async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
512        let all_words = words.join(" ");
513        let mnemonic =
514            Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
515
516        let mut dbtx = self.unified_database.begin_transaction().await;
517
518        if dbtx.get_value(&MnemonicKey).await.is_some() {
519            anyhow::bail!(
520                "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
521            );
522        }
523
524        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
525            .await;
526
527        dbtx.commit_tx().await;
528
529        Ok(())
530    }
531
532    /// Generate a new random mnemonic and set it
533    /// Returns an error if a mnemonic is already set
534    async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
535        let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
536        let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
537
538        let mut dbtx = self.unified_database.begin_transaction().await;
539
540        if dbtx.get_value(&MnemonicKey).await.is_some() {
541            anyhow::bail!(
542                "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
543            );
544        }
545
546        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
547            .await;
548
549        dbtx.commit_tx().await;
550
551        Ok(words)
552    }
553
554    /// Derive federation-specific secret from wallet mnemonic
555    fn derive_federation_secret(
556        &self,
557        mnemonic: &Mnemonic,
558        federation_id: &FederationId,
559    ) -> DerivableSecret {
560        let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
561        let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
562        let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
563        let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
564        federation_wallet_root_secret.child_key(ChildId(0))
565    }
566
567    /// Fetch mnemonic from database
568    async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
569        let mut dbtx = self.unified_database.begin_transaction_nc().await;
570
571        if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
572            let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
573            Ok(Some(mnemonic))
574        } else {
575            Ok(None)
576        }
577    }
578}