fedimint_client_rpc/
lib.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, ClientPreview, RootSecret};
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::config::{FederationId, FederationIdPrefix};
14use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::{Decodable, Encodable};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_core::{Amount, TieredCounts, impl_db_record};
20use fedimint_derive_secret::{ChildId, DerivableSecret};
21use fedimint_ln_client::{LightningClientInit, LightningClientModule};
22use fedimint_meta_client::MetaClientInit;
23use fedimint_mint_client::{MintClientInit, MintClientModule, OOBNotes};
24use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
25use futures::StreamExt;
26use futures::future::{AbortHandle, Abortable};
27use lightning_invoice::Bolt11InvoiceDescriptionRef;
28use rand::thread_rng;
29use serde::{Deserialize, Serialize};
30use serde_json::json;
31use tokio::sync::Mutex;
32use tracing::info;
33
34// Key prefixes for the unified database
35#[repr(u8)]
36#[derive(Clone, Copy, Debug)]
37pub enum DbKeyPrefix {
38    ClientDatabase = 0x00,
39    Mnemonic = 0x01,
40}
41
42#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
43pub struct MnemonicKey;
44
45impl_db_record!(
46    key = MnemonicKey,
47    value = Vec<u8>,
48    db_prefix = DbKeyPrefix::Mnemonic,
49);
50
51/// Parsed details from an OOB note.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ParsedNoteDetails {
54    /// Total amount of all notes in the OOB notes
55    pub total_amount: Amount,
56    /// Federation ID prefix (always present)
57    pub federation_id_prefix: FederationIdPrefix,
58    /// Full federation ID (if invite is present)
59    pub federation_id: Option<FederationId>,
60    /// Invite code to join the federation (if present)
61    pub invite_code: Option<InviteCode>,
62    /// Number of notes per denomination
63    pub note_counts: TieredCounts,
64}
65
66#[derive(Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub struct RpcRequest {
69    pub request_id: u64,
70    #[serde(flatten)]
71    pub kind: RpcRequestKind,
72}
73
74#[derive(Serialize, Deserialize)]
75#[serde(tag = "type", rename_all = "snake_case")]
76pub enum RpcRequestKind {
77    SetMnemonic {
78        words: Vec<String>,
79    },
80    GenerateMnemonic,
81    GetMnemonic,
82    /// Join federation (requires mnemonic to be set first)
83    JoinFederation {
84        invite_code: String,
85        force_recover: bool,
86        client_name: String,
87    },
88    OpenClient {
89        client_name: String,
90    },
91    CloseClient {
92        client_name: String,
93    },
94    ClientRpc {
95        client_name: String,
96        module: String,
97        method: String,
98        payload: serde_json::Value,
99    },
100    CancelRpc {
101        cancel_request_id: u64,
102    },
103    ParseInviteCode {
104        invite_code: String,
105    },
106    ParseBolt11Invoice {
107        invoice: String,
108    },
109    PreviewFederation {
110        invite_code: String,
111    },
112    ParseOobNotes {
113        oob_notes: String,
114    },
115}
116
117#[derive(Serialize, Deserialize, Clone, Debug)]
118pub struct RpcResponse {
119    pub request_id: u64,
120    #[serde(flatten)]
121    pub kind: RpcResponseKind,
122}
123
124#[derive(Serialize, Deserialize, Clone, Debug)]
125#[serde(tag = "type", rename_all = "snake_case")]
126pub enum RpcResponseKind {
127    Data { data: serde_json::Value },
128    Error { error: String },
129    Aborted {},
130    End {},
131}
132
133pub trait RpcResponseHandler: MaybeSend + MaybeSync {
134    fn handle_response(&self, response: RpcResponse);
135}
136
137pub struct RpcGlobalState {
138    /// Endpoints used for all global-state functionality
139    connectors: ConnectorRegistry,
140    clients: Mutex<HashMap<String, ClientHandleArc>>,
141    rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
142    unified_database: Database,
143    preview_cache: std::sync::Mutex<Option<ClientPreview>>,
144}
145
146pub struct HandledRpc<'a> {
147    pub task: Option<BoxFuture<'a, ()>>,
148}
149
150impl RpcGlobalState {
151    pub fn new(connectors: ConnectorRegistry, unified_database: Database) -> Self {
152        Self {
153            connectors,
154            clients: Mutex::new(HashMap::new()),
155            rpc_handles: std::sync::Mutex::new(HashMap::new()),
156            unified_database,
157            preview_cache: std::sync::Mutex::new(None),
158        }
159    }
160
161    async fn add_client(&self, client_name: String, client: ClientHandleArc) {
162        let mut clients = self.clients.lock().await;
163        clients.insert(client_name, client);
164    }
165
166    async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
167        let clients = self.clients.lock().await;
168        clients.get(client_name).cloned()
169    }
170
171    fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
172        let mut handles = self.rpc_handles.lock().unwrap();
173        if handles.insert(request_id, handle).is_some() {
174            tracing::error!("RPC CLIENT ERROR: request id reuse detected");
175        }
176    }
177
178    fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
179        let mut handles = self.rpc_handles.lock().unwrap();
180        handles.remove(&request_id)
181    }
182
183    async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
184        let mut builder = fedimint_client::Client::builder().await?;
185        builder.with_module(MintClientInit);
186        builder.with_module(LightningClientInit::default());
187        builder.with_module(WalletClientInit(None));
188        builder.with_module(MetaClientInit);
189        Ok(builder)
190    }
191
192    /// Get client-specific database with proper prefix
193    async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
194        assert_eq!(client_name.len(), 36);
195
196        let unified_db = &self.unified_database;
197        let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
198        client_prefix.extend_from_slice(client_name.as_bytes());
199        Ok(unified_db.with_prefix(client_prefix))
200    }
201
202    /// Handle joining federation using unified database
203    async fn handle_join_federation(
204        &self,
205
206        invite_code: String,
207        client_name: String,
208        force_recover: bool,
209    ) -> anyhow::Result<()> {
210        // Check if wallet mnemonic is set
211        let mnemonic = self
212            .get_mnemonic_from_db()
213            .await?
214            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
215
216        let client_db = self.client_db(client_name.clone()).await?;
217
218        let invite_code = InviteCode::from_str(&invite_code)?;
219        let federation_id = invite_code.federation_id();
220
221        // Derive federation-specific secret from wallet mnemonic
222        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
223
224        // Try to consume cached preview, otherwise create new one
225        let cached_preview = self.preview_cache.lock().unwrap().take();
226        let preview = match cached_preview {
227            Some(preview) if preview.config().calculate_federation_id() == federation_id => preview,
228            _ => {
229                let builder = Self::client_builder().await?;
230                builder
231                    .preview(self.connectors.clone(), &invite_code)
232                    .await?
233            }
234        };
235
236        // Check if backup exists
237        let backup = preview
238            .download_backup_from_federation(RootSecret::StandardDoubleDerive(
239                federation_secret.clone(),
240            ))
241            .await?;
242
243        let client = if force_recover || backup.is_some() {
244            Arc::new(
245                preview
246                    .recover(
247                        client_db,
248                        RootSecret::StandardDoubleDerive(federation_secret),
249                        backup,
250                    )
251                    .await?,
252            )
253        } else {
254            Arc::new(
255                preview
256                    .join(
257                        client_db,
258                        RootSecret::StandardDoubleDerive(federation_secret),
259                    )
260                    .await?,
261            )
262        };
263
264        self.add_client(client_name, client).await;
265        Ok(())
266    }
267
268    async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
269        // Check if wallet mnemonic is set
270        let mnemonic = self
271            .get_mnemonic_from_db()
272            .await?
273            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
274
275        let client_db = self.client_db(client_name.clone()).await?;
276
277        if !fedimint_client::Client::is_initialized(&client_db).await {
278            anyhow::bail!("client is not initialized for this database");
279        }
280
281        // Get the client config to retrieve the federation ID
282        let client_config = fedimint_client::Client::get_config_from_db(&client_db)
283            .await
284            .context("Client config not found in database")?;
285
286        let federation_id = client_config.calculate_federation_id();
287
288        // Derive federation-specific secret from wallet mnemonic
289        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
290
291        let builder = Self::client_builder().await?;
292        let client = Arc::new(
293            builder
294                .open(
295                    self.connectors.clone(),
296                    client_db,
297                    RootSecret::StandardDoubleDerive(federation_secret),
298                )
299                .await?,
300        );
301
302        self.add_client(client_name, client).await;
303        Ok(())
304    }
305
306    async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
307        let mut clients = self.clients.lock().await;
308        let mut client = clients.remove(&client_name).context("client not found")?;
309
310        // RPC calls might have cloned the client Arc before we remove the client.
311        for attempt in 0.. {
312            info!(attempt, "waiting for RPCs to drop the federation object");
313            match Arc::try_unwrap(client) {
314                Ok(client) => {
315                    client.shutdown().await;
316                    break;
317                }
318                Err(client_val) => client = client_val,
319            }
320            fedimint_core::task::sleep(Duration::from_millis(100)).await;
321        }
322        Ok(())
323    }
324
325    fn handle_client_rpc(
326        self: Arc<Self>,
327        client_name: String,
328        module: String,
329        method: String,
330        payload: serde_json::Value,
331    ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
332        Box::pin(try_stream! {
333            let client = self
334                .get_client(&client_name)
335                .await
336                .with_context(|| format!("Client not found: {client_name}"))?;
337            match module.as_str() {
338                "" => {
339                    let mut stream = client.handle_global_rpc(method, payload);
340                    while let Some(item) = stream.next().await {
341                        yield item?;
342                    }
343                }
344                "ln" => {
345                    let ln = client.get_first_module::<LightningClientModule>()?.inner();
346                    let mut stream = ln.handle_rpc(method, payload).await;
347                    while let Some(item) = stream.next().await {
348                        yield item?;
349                    }
350                }
351                "mint" => {
352                    let mint = client.get_first_module::<MintClientModule>()?.inner();
353                    let mut stream = mint.handle_rpc(method, payload).await;
354                    while let Some(item) = stream.next().await {
355                        yield item?;
356                    }
357                }
358                "wallet" => {
359                    let wallet = client
360                        .get_first_module::<WalletClientModule>()?
361                        .inner();
362                    let mut stream = wallet.handle_rpc(method, payload).await;
363                    while let Some(item) = stream.next().await {
364                        yield item?;
365                    }
366                }
367                _ => {
368                    Err(anyhow::format_err!("module not found: {module}"))?;
369                },
370            };
371        })
372    }
373
374    fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
375        let invite_code = InviteCode::from_str(&invite_code)?;
376
377        Ok(json!({
378            "url": invite_code.url(),
379            "federation_id": invite_code.federation_id(),
380        }))
381    }
382
383    fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
384        let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
385            .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
386
387        let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
388        let amount_sat = amount_msat as f64 / 1000.0;
389
390        let expiry_seconds = invoice.expiry_time().as_secs();
391
392        // memo
393        let description = match invoice.description() {
394            Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
395            Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
396        };
397
398        Ok(json!({
399            "amount": amount_sat,
400            "expiry": expiry_seconds,
401            "memo": description,
402        }))
403    }
404
405    async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
406        let invite = InviteCode::from_str(&invite_code)?;
407        let federation_id = invite.federation_id();
408
409        let builder = Self::client_builder().await?;
410        let preview = builder.preview(self.connectors.clone(), &invite).await?;
411
412        let json_config = preview.config().to_json();
413        // Store in cache
414        *self.preview_cache.lock().unwrap() = Some(preview);
415
416        Ok(json!({
417            "config": json_config,
418            "federation_id": federation_id.to_string(),
419        }))
420    }
421
422    fn handle_rpc_inner(
423        self: Arc<Self>,
424        request: RpcRequest,
425    ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
426        match request.kind {
427            RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
428                self.set_mnemonic(words).await?;
429                yield serde_json::json!({ "success": true });
430            })),
431            RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
432                let words = self.generate_mnemonic().await?;
433                yield serde_json::json!({ "mnemonic": words });
434            })),
435            RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
436                let words = self.get_mnemonic_words().await?;
437                yield serde_json::json!({ "mnemonic": words });
438            })),
439            RpcRequestKind::JoinFederation {
440                invite_code,
441                client_name,
442                force_recover,
443            } => Some(Box::pin(try_stream! {
444                self.handle_join_federation(invite_code, client_name, force_recover)
445                    .await?;
446                yield serde_json::json!(null);
447            })),
448            RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
449                self.handle_open_client(client_name).await?;
450                yield serde_json::json!(null);
451            })),
452            RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
453                self.handle_close_client(client_name).await?;
454                yield serde_json::json!(null);
455            })),
456            RpcRequestKind::ClientRpc {
457                client_name,
458                module,
459                method,
460                payload,
461            } => Some(self.handle_client_rpc(client_name, module, method, payload)),
462            RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
463                let result = self.parse_invite_code(invite_code)?;
464                yield result;
465            })),
466            RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
467                let result = self.parse_bolt11_invoice(invoice)?;
468                yield result;
469            })),
470            RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
471                let result = self.preview_federation(invite_code).await?;
472                yield result;
473            })),
474            RpcRequestKind::ParseOobNotes { oob_notes } => Some(Box::pin(try_stream! {
475                let parsed = parse_oob_notes(&oob_notes)?;
476                yield serde_json::to_value(parsed)?;
477            })),
478            RpcRequestKind::CancelRpc { cancel_request_id } => {
479                if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
480                    handle.abort();
481                }
482                None
483            }
484        }
485    }
486
487    pub fn handle_rpc(
488        self: Arc<Self>,
489        request: RpcRequest,
490        handler: impl RpcResponseHandler + 'static,
491    ) -> HandledRpc<'static> {
492        let request_id = request.request_id;
493
494        let Some(stream) = self.clone().handle_rpc_inner(request) else {
495            return HandledRpc { task: None };
496        };
497
498        let (abort_handle, abort_registration) = AbortHandle::new_pair();
499        self.add_rpc_handle(request_id, abort_handle);
500
501        let task = Box::pin(async move {
502            let mut stream = Abortable::new(stream, abort_registration);
503
504            while let Some(result) = stream.next().await {
505                let response = match result {
506                    Ok(value) => RpcResponse {
507                        request_id,
508                        kind: RpcResponseKind::Data { data: value },
509                    },
510                    Err(e) => RpcResponse {
511                        request_id,
512                        kind: RpcResponseKind::Error {
513                            error: e.to_string(),
514                        },
515                    },
516                };
517                handler.handle_response(response);
518            }
519
520            // Clean up abort handle and send end message
521            let _ = self.remove_rpc_handle(request_id);
522            handler.handle_response(RpcResponse {
523                request_id,
524                kind: if stream.is_aborted() {
525                    RpcResponseKind::Aborted {}
526                } else {
527                    RpcResponseKind::End {}
528                },
529            });
530        });
531
532        HandledRpc { task: Some(task) }
533    }
534
535    /// Retrieve the wallet-level mnemonic words.
536    /// Returns the mnemonic as a vector of words, or None if no mnemonic is
537    /// set.
538    async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
539        let mnemonic = self.get_mnemonic_from_db().await?;
540
541        if let Some(mnemonic) = mnemonic {
542            let words = mnemonic.words().map(|w| w.to_string()).collect();
543            Ok(Some(words))
544        } else {
545            Ok(None)
546        }
547    }
548    /// Set a mnemonic from user-provided words
549    /// Returns an error if a mnemonic is already set
550    async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
551        let all_words = words.join(" ");
552        let mnemonic =
553            Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
554
555        let mut dbtx = self.unified_database.begin_transaction().await;
556
557        if dbtx.get_value(&MnemonicKey).await.is_some() {
558            anyhow::bail!(
559                "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
560            );
561        }
562
563        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
564            .await;
565
566        dbtx.commit_tx().await;
567
568        Ok(())
569    }
570
571    /// Generate a new random mnemonic and set it
572    /// Returns an error if a mnemonic is already set
573    async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
574        let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
575        let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
576
577        let mut dbtx = self.unified_database.begin_transaction().await;
578
579        if dbtx.get_value(&MnemonicKey).await.is_some() {
580            anyhow::bail!(
581                "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
582            );
583        }
584
585        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
586            .await;
587
588        dbtx.commit_tx().await;
589
590        Ok(words)
591    }
592
593    /// Derive federation-specific secret from wallet mnemonic
594    fn derive_federation_secret(
595        &self,
596        mnemonic: &Mnemonic,
597        federation_id: &FederationId,
598    ) -> DerivableSecret {
599        let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
600        let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
601        let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
602        let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
603        federation_wallet_root_secret.child_key(ChildId(0))
604    }
605
606    /// Fetch mnemonic from database
607    async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
608        let mut dbtx = self.unified_database.begin_transaction_nc().await;
609
610        if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
611            let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
612            Ok(Some(mnemonic))
613        } else {
614            Ok(None)
615        }
616    }
617}
618
619pub fn parse_oob_notes(oob_notes_str: &str) -> anyhow::Result<ParsedNoteDetails> {
620    let oob_notes =
621        OOBNotes::from_str(oob_notes_str).context("Failed to parse OOB notes string")?;
622
623    let total_amount = oob_notes.total_amount();
624    let federation_id_prefix = oob_notes.federation_id_prefix();
625    let invite_code = oob_notes.federation_invite();
626    let federation_id = invite_code.as_ref().map(|inv| inv.federation_id());
627
628    // Get note counts by denomination
629    let notes = oob_notes.notes();
630    let mut note_counts = TieredCounts::default();
631    for (amount, _note) in notes.iter_items() {
632        note_counts.inc(amount, 1);
633    }
634
635    Ok(ParsedNoteDetails {
636        total_amount,
637        federation_id_prefix,
638        federation_id,
639        invite_code,
640        note_counts,
641    })
642}