fedimint_client_rpc/
lib.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, RootSecret};
12use fedimint_core::config::FederationId;
13use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
14use fedimint_core::encoding::{Decodable, Encodable};
15use fedimint_core::impl_db_record;
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_derive_secret::{ChildId, DerivableSecret};
20use fedimint_ln_client::{LightningClientInit, LightningClientModule};
21use fedimint_meta_client::MetaClientInit;
22use fedimint_mint_client::{MintClientInit, MintClientModule};
23use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
24use futures::StreamExt;
25use futures::future::{AbortHandle, Abortable};
26use lightning_invoice::Bolt11InvoiceDescriptionRef;
27use rand::thread_rng;
28use serde::{Deserialize, Serialize};
29use serde_json::json;
30use tokio::sync::Mutex;
31use tracing::info;
32
33// Key prefixes for the unified database
34#[repr(u8)]
35#[derive(Clone, Copy, Debug)]
36pub enum DbKeyPrefix {
37    ClientDatabase = 0x00,
38    Mnemonic = 0x01,
39}
40
41#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
42pub struct MnemonicKey;
43
44impl_db_record!(
45    key = MnemonicKey,
46    value = Vec<u8>,
47    db_prefix = DbKeyPrefix::Mnemonic,
48);
49
50#[derive(Serialize, Deserialize)]
51#[serde(rename_all = "snake_case")]
52pub struct RpcRequest {
53    pub request_id: u64,
54    #[serde(flatten)]
55    pub kind: RpcRequestKind,
56}
57
58#[derive(Serialize, Deserialize)]
59#[serde(tag = "type", rename_all = "snake_case")]
60pub enum RpcRequestKind {
61    SetMnemonic {
62        words: Vec<String>,
63    },
64    GenerateMnemonic,
65    GetMnemonic,
66    /// Join federation (requires mnemonic to be set first)
67    JoinFederation {
68        invite_code: String,
69        client_name: String,
70    },
71    OpenClient {
72        client_name: String,
73    },
74    CloseClient {
75        client_name: String,
76    },
77    ClientRpc {
78        client_name: String,
79        module: String,
80        method: String,
81        payload: serde_json::Value,
82    },
83    CancelRpc {
84        cancel_request_id: u64,
85    },
86    ParseInviteCode {
87        invite_code: String,
88    },
89    ParseBolt11Invoice {
90        invoice: String,
91    },
92    PreviewFederation {
93        invite_code: String,
94    },
95}
96
97#[derive(Serialize, Deserialize, Clone, Debug)]
98pub struct RpcResponse {
99    pub request_id: u64,
100    #[serde(flatten)]
101    pub kind: RpcResponseKind,
102}
103
104#[derive(Serialize, Deserialize, Clone, Debug)]
105#[serde(tag = "type", rename_all = "snake_case")]
106pub enum RpcResponseKind {
107    Data { data: serde_json::Value },
108    Error { error: String },
109    Aborted {},
110    End {},
111}
112
113pub trait RpcResponseHandler: MaybeSend + MaybeSync {
114    fn handle_response(&self, response: RpcResponse);
115}
116
117pub struct RpcGlobalState {
118    clients: Mutex<HashMap<String, ClientHandleArc>>,
119    rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
120    unified_database: Database,
121}
122
123pub struct HandledRpc<'a> {
124    pub task: Option<BoxFuture<'a, ()>>,
125}
126
127impl RpcGlobalState {
128    pub fn new(unified_database: Database) -> Self {
129        Self {
130            clients: Mutex::new(HashMap::new()),
131            rpc_handles: std::sync::Mutex::new(HashMap::new()),
132            unified_database,
133        }
134    }
135
136    async fn add_client(&self, client_name: String, client: ClientHandleArc) {
137        let mut clients = self.clients.lock().await;
138        clients.insert(client_name, client);
139    }
140
141    async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
142        let clients = self.clients.lock().await;
143        clients.get(client_name).cloned()
144    }
145
146    fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
147        let mut handles = self.rpc_handles.lock().unwrap();
148        if handles.insert(request_id, handle).is_some() {
149            tracing::error!("RPC CLIENT ERROR: request id reuse detected");
150        }
151    }
152
153    fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
154        let mut handles = self.rpc_handles.lock().unwrap();
155        handles.remove(&request_id)
156    }
157
158    async fn client_builder(db: Database) -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
159        let mut builder = fedimint_client::Client::builder(db).await?;
160        builder.with_module(MintClientInit);
161        builder.with_module(LightningClientInit::default());
162        builder.with_module(WalletClientInit(None));
163        builder.with_module(MetaClientInit);
164        builder.with_primary_module_kind(fedimint_mint_client::KIND);
165        Ok(builder)
166    }
167
168    /// Get client-specific database with proper prefix
169    async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
170        assert_eq!(client_name.len(), 36);
171
172        let unified_db = &self.unified_database;
173        let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
174        client_prefix.extend_from_slice(client_name.as_bytes());
175        Ok(unified_db.with_prefix(client_prefix))
176    }
177
178    /// Handle joining federation using unified database
179    async fn handle_join_federation(
180        &self,
181        invite_code: String,
182        client_name: String,
183    ) -> anyhow::Result<()> {
184        // Check if wallet mnemonic is set
185        let mnemonic = self
186            .get_mnemonic_from_db()
187            .await?
188            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
189
190        let client_db = self.client_db(client_name.clone()).await?;
191
192        let invite_code = InviteCode::from_str(&invite_code)?;
193        let federation_id = invite_code.federation_id();
194
195        // Derive federation-specific secret from wallet mnemonic
196        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
197
198        let builder = Self::client_builder(client_db).await?;
199        let client = Arc::new(
200            builder
201                .preview(&invite_code)
202                .await?
203                .join(RootSecret::StandardDoubleDerive(federation_secret))
204                .await?,
205        );
206
207        self.add_client(client_name, client).await;
208        Ok(())
209    }
210
211    async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
212        // Check if wallet mnemonic is set
213        let mnemonic = self
214            .get_mnemonic_from_db()
215            .await?
216            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
217
218        let client_db = self.client_db(client_name.clone()).await?;
219
220        if !fedimint_client::Client::is_initialized(&client_db).await {
221            anyhow::bail!("client is not initialized for this database");
222        }
223
224        // Get the client config to retrieve the federation ID
225        let client_config = fedimint_client::Client::get_config_from_db(&client_db)
226            .await
227            .context("Client config not found in database")?;
228
229        let federation_id = client_config.calculate_federation_id();
230
231        // Derive federation-specific secret from wallet mnemonic
232        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
233
234        let builder = Self::client_builder(client_db).await?;
235        let client = Arc::new(
236            builder
237                .open(RootSecret::StandardDoubleDerive(federation_secret))
238                .await?,
239        );
240
241        self.add_client(client_name, client).await;
242        Ok(())
243    }
244
245    async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
246        let mut clients = self.clients.lock().await;
247        let mut client = clients.remove(&client_name).context("client not found")?;
248
249        // RPC calls might have cloned the client Arc before we remove the client.
250        for attempt in 0.. {
251            info!(attempt, "waiting for RPCs to drop the federation object");
252            match Arc::try_unwrap(client) {
253                Ok(client) => {
254                    client.shutdown().await;
255                    break;
256                }
257                Err(client_val) => client = client_val,
258            }
259            fedimint_core::task::sleep(Duration::from_millis(100)).await;
260        }
261        Ok(())
262    }
263
264    fn handle_client_rpc(
265        self: Arc<Self>,
266        client_name: String,
267        module: String,
268        method: String,
269        payload: serde_json::Value,
270    ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
271        Box::pin(try_stream! {
272            let client = self
273                .get_client(&client_name)
274                .await
275                .with_context(|| format!("Client not found: {}", client_name))?;
276            match module.as_str() {
277                "" => {
278                    let mut stream = client.handle_global_rpc(method, payload);
279                    while let Some(item) = stream.next().await {
280                        yield item?;
281                    }
282                }
283                "ln" => {
284                    let ln = client.get_first_module::<LightningClientModule>()?.inner();
285                    let mut stream = ln.handle_rpc(method, payload).await;
286                    while let Some(item) = stream.next().await {
287                        yield item?;
288                    }
289                }
290                "mint" => {
291                    let mint = client.get_first_module::<MintClientModule>()?.inner();
292                    let mut stream = mint.handle_rpc(method, payload).await;
293                    while let Some(item) = stream.next().await {
294                        yield item?;
295                    }
296                }
297                "wallet" => {
298                    let wallet = client
299                        .get_first_module::<WalletClientModule>()?
300                        .inner();
301                    let mut stream = wallet.handle_rpc(method, payload).await;
302                    while let Some(item) = stream.next().await {
303                        yield item?;
304                    }
305                }
306                _ => {
307                    Err(anyhow::format_err!("module not found: {module}"))?;
308                },
309            };
310        })
311    }
312
313    fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
314        let invite_code = InviteCode::from_str(&invite_code)?;
315
316        Ok(json!({
317            "url": invite_code.url(),
318            "federation_id": invite_code.federation_id(),
319        }))
320    }
321
322    fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
323        let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
324            .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
325
326        let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
327        let amount_sat = amount_msat as f64 / 1000.0;
328
329        let expiry_seconds = invoice.expiry_time().as_secs();
330
331        // memo
332        let description = match invoice.description() {
333            Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
334            Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
335        };
336
337        Ok(json!({
338            "amount": amount_sat,
339            "expiry": expiry_seconds,
340            "memo": description,
341        }))
342    }
343
344    async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
345        let invite = InviteCode::from_str(&invite_code)?;
346        let client_config = fedimint_api_client::api::net::Connector::default()
347            .download_from_invite_code(&invite)
348            .await?;
349        let json_config = client_config.to_json();
350        let federation_id = client_config.calculate_federation_id();
351
352        Ok(json!({
353            "config": json_config,
354            "federation_id": federation_id.to_string(),
355        }))
356    }
357
358    fn handle_rpc_inner(
359        self: Arc<Self>,
360        request: RpcRequest,
361    ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
362        match request.kind {
363            RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
364                self.set_mnemonic(words).await?;
365                yield serde_json::json!({ "success": true });
366            })),
367            RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
368                let words = self.generate_mnemonic().await?;
369                yield serde_json::json!({ "mnemonic": words });
370            })),
371            RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
372                let words = self.get_mnemonic_words().await?;
373                yield serde_json::json!({ "mnemonic": words });
374            })),
375            RpcRequestKind::JoinFederation {
376                invite_code,
377                client_name,
378            } => Some(Box::pin(try_stream! {
379                self.handle_join_federation(invite_code, client_name)
380                    .await?;
381                yield serde_json::json!(null);
382            })),
383            RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
384                self.handle_open_client(client_name).await?;
385                yield serde_json::json!(null);
386            })),
387            RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
388                self.handle_close_client(client_name).await?;
389                yield serde_json::json!(null);
390            })),
391            RpcRequestKind::ClientRpc {
392                client_name,
393                module,
394                method,
395                payload,
396            } => Some(self.handle_client_rpc(client_name, module, method, payload)),
397            RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
398                let result = self.parse_invite_code(invite_code)?;
399                yield result;
400            })),
401            RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
402                let result = self.parse_bolt11_invoice(invoice)?;
403                yield result;
404            })),
405            RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
406                let result = self.preview_federation(invite_code).await?;
407                yield result;
408            })),
409            RpcRequestKind::CancelRpc { cancel_request_id } => {
410                if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
411                    handle.abort();
412                }
413                None
414            }
415        }
416    }
417
418    pub fn handle_rpc(
419        self: Arc<Self>,
420        request: RpcRequest,
421        handler: impl RpcResponseHandler + 'static,
422    ) -> HandledRpc<'static> {
423        let request_id = request.request_id;
424
425        let Some(stream) = self.clone().handle_rpc_inner(request) else {
426            return HandledRpc { task: None };
427        };
428
429        let (abort_handle, abort_registration) = AbortHandle::new_pair();
430        self.add_rpc_handle(request_id, abort_handle);
431
432        let task = Box::pin(async move {
433            let mut stream = Abortable::new(stream, abort_registration);
434
435            while let Some(result) = stream.next().await {
436                let response = match result {
437                    Ok(value) => RpcResponse {
438                        request_id,
439                        kind: RpcResponseKind::Data { data: value },
440                    },
441                    Err(e) => RpcResponse {
442                        request_id,
443                        kind: RpcResponseKind::Error {
444                            error: e.to_string(),
445                        },
446                    },
447                };
448                handler.handle_response(response);
449            }
450
451            // Clean up abort handle and send end message
452            let _ = self.remove_rpc_handle(request_id);
453            handler.handle_response(RpcResponse {
454                request_id,
455                kind: if stream.is_aborted() {
456                    RpcResponseKind::Aborted {}
457                } else {
458                    RpcResponseKind::End {}
459                },
460            });
461        });
462
463        HandledRpc { task: Some(task) }
464    }
465
466    /// Retrieve the wallet-level mnemonic words.
467    /// Returns the mnemonic as a vector of words, or None if no mnemonic is
468    /// set.
469    async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
470        let mnemonic = self.get_mnemonic_from_db().await?;
471
472        if let Some(mnemonic) = mnemonic {
473            let words = mnemonic.words().map(|w| w.to_string()).collect();
474            Ok(Some(words))
475        } else {
476            Ok(None)
477        }
478    }
479    /// Set a mnemonic from user-provided words
480    /// Returns an error if a mnemonic is already set
481    async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
482        let all_words = words.join(" ");
483        let mnemonic =
484            Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
485
486        let mut dbtx = self.unified_database.begin_transaction().await;
487
488        if dbtx.get_value(&MnemonicKey).await.is_some() {
489            anyhow::bail!(
490                "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
491            );
492        }
493
494        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
495            .await;
496
497        dbtx.commit_tx().await;
498
499        Ok(())
500    }
501
502    /// Generate a new random mnemonic and set it
503    /// Returns an error if a mnemonic is already set
504    async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
505        let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
506        let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
507
508        let mut dbtx = self.unified_database.begin_transaction().await;
509
510        if dbtx.get_value(&MnemonicKey).await.is_some() {
511            anyhow::bail!(
512                "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
513            );
514        }
515
516        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
517            .await;
518
519        dbtx.commit_tx().await;
520
521        Ok(words)
522    }
523
524    /// Derive federation-specific secret from wallet mnemonic
525    fn derive_federation_secret(
526        &self,
527        mnemonic: &Mnemonic,
528        federation_id: &FederationId,
529    ) -> DerivableSecret {
530        let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
531        let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
532        let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
533        let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
534        federation_wallet_root_secret.child_key(ChildId(0))
535    }
536
537    /// Fetch mnemonic from database
538    async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
539        let mut dbtx = self.unified_database.begin_transaction_nc().await;
540
541        if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
542            let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
543            Ok(Some(mnemonic))
544        } else {
545            Ok(None)
546        }
547    }
548}