Skip to main content

fedimint_client_rpc/
lib.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, ClientPreview, RootSecret};
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::config::{FederationId, FederationIdPrefix};
14use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::{Decodable, Encodable};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_core::{Amount, TieredCounts, impl_db_record};
20use fedimint_derive_secret::{ChildId, DerivableSecret};
21use fedimint_ln_client::{LightningClientInit, LightningClientModule};
22use fedimint_meta_client::{MetaClientInit, MetaClientModule};
23use fedimint_mint_client::{MintClientInit, MintClientModule, OOBNotes};
24use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
25use futures::StreamExt;
26use futures::future::{AbortHandle, Abortable};
27use lightning_invoice::Bolt11InvoiceDescriptionRef;
28use rand::thread_rng;
29use serde::{Deserialize, Serialize};
30use serde_json::json;
31use tokio::sync::Mutex;
32use tracing::info;
33
34// Key prefixes for the unified database
35#[repr(u8)]
36#[derive(Clone, Copy, Debug)]
37pub enum DbKeyPrefix {
38    ClientDatabase = 0x00,
39    Mnemonic = 0x01,
40}
41
42#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
43pub struct MnemonicKey;
44
45impl_db_record!(
46    key = MnemonicKey,
47    value = Vec<u8>,
48    db_prefix = DbKeyPrefix::Mnemonic,
49);
50
51/// Parsed details from an OOB note.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ParsedNoteDetails {
54    /// Total amount of all notes in the OOB notes
55    pub total_amount: Amount,
56    /// Federation ID prefix (always present)
57    pub federation_id_prefix: FederationIdPrefix,
58    /// Full federation ID (if invite is present)
59    pub federation_id: Option<FederationId>,
60    /// Invite code to join the federation (if present)
61    pub invite_code: Option<InviteCode>,
62    /// Number of notes per denomination
63    pub note_counts: TieredCounts,
64}
65
66#[derive(Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub struct RpcRequest {
69    pub request_id: u64,
70    #[serde(flatten)]
71    pub kind: RpcRequestKind,
72}
73
74#[derive(Serialize, Deserialize)]
75#[serde(tag = "type", rename_all = "snake_case")]
76pub enum RpcRequestKind {
77    SetMnemonic {
78        words: Vec<String>,
79    },
80    GenerateMnemonic,
81    GetMnemonic,
82    HasMnemonicSet,
83    /// Join federation (requires mnemonic to be set first)
84    JoinFederation {
85        invite_code: String,
86        force_recover: bool,
87        client_name: String,
88    },
89    OpenClient {
90        client_name: String,
91    },
92    CloseClient {
93        client_name: String,
94    },
95    ClientRpc {
96        client_name: String,
97        module: String,
98        method: String,
99        payload: serde_json::Value,
100    },
101    CancelRpc {
102        cancel_request_id: u64,
103    },
104    ParseInviteCode {
105        invite_code: String,
106    },
107    ParseBolt11Invoice {
108        invoice: String,
109    },
110    PreviewFederation {
111        invite_code: String,
112    },
113    ParseOobNotes {
114        oob_notes: String,
115    },
116    ParseLightningAddress {
117        address: String,
118    },
119}
120
121#[derive(Serialize, Deserialize, Clone, Debug)]
122pub struct RpcResponse {
123    pub request_id: u64,
124    #[serde(flatten)]
125    pub kind: RpcResponseKind,
126}
127
128#[derive(Serialize, Deserialize, Clone, Debug)]
129#[serde(tag = "type", rename_all = "snake_case")]
130pub enum RpcResponseKind {
131    Data { data: serde_json::Value },
132    Error { error: String },
133    Aborted {},
134    End {},
135}
136
137pub trait RpcResponseHandler: MaybeSend + MaybeSync {
138    fn handle_response(&self, response: RpcResponse);
139}
140
141pub struct RpcGlobalState {
142    /// Endpoints used for all global-state functionality
143    connectors: ConnectorRegistry,
144    clients: Mutex<HashMap<String, ClientHandleArc>>,
145    rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
146    unified_database: Database,
147    preview_cache: std::sync::Mutex<Option<ClientPreview>>,
148}
149
150pub struct HandledRpc<'a> {
151    pub task: Option<BoxFuture<'a, ()>>,
152}
153
154impl RpcGlobalState {
155    pub fn new(connectors: ConnectorRegistry, unified_database: Database) -> Self {
156        Self {
157            connectors,
158            clients: Mutex::new(HashMap::new()),
159            rpc_handles: std::sync::Mutex::new(HashMap::new()),
160            unified_database,
161            preview_cache: std::sync::Mutex::new(None),
162        }
163    }
164
165    async fn add_client(&self, client_name: String, client: ClientHandleArc) {
166        let mut clients = self.clients.lock().await;
167        clients.insert(client_name, client);
168    }
169
170    async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
171        let clients = self.clients.lock().await;
172        clients.get(client_name).cloned()
173    }
174
175    fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
176        let mut handles = self.rpc_handles.lock().unwrap();
177        if handles.insert(request_id, handle).is_some() {
178            tracing::error!("RPC CLIENT ERROR: request id reuse detected");
179        }
180    }
181
182    fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
183        let mut handles = self.rpc_handles.lock().unwrap();
184        handles.remove(&request_id)
185    }
186
187    async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
188        let mut builder = fedimint_client::Client::builder().await?;
189        builder.with_module(MintClientInit);
190        builder.with_module(LightningClientInit::default());
191        builder.with_module(WalletClientInit(None));
192        builder.with_module(MetaClientInit);
193        Ok(builder)
194    }
195
196    /// Get client-specific database with proper prefix
197    async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
198        assert_eq!(client_name.len(), 36);
199
200        let unified_db = &self.unified_database;
201        let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
202        client_prefix.extend_from_slice(client_name.as_bytes());
203        Ok(unified_db.with_prefix(client_prefix))
204    }
205
206    /// Handle joining federation using unified database
207    async fn handle_join_federation(
208        &self,
209
210        invite_code: String,
211        client_name: String,
212        force_recover: bool,
213    ) -> anyhow::Result<()> {
214        // Check if wallet mnemonic is set
215        let mnemonic = self
216            .get_mnemonic_from_db()
217            .await?
218            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
219
220        let client_db = self.client_db(client_name.clone()).await?;
221
222        let invite_code = InviteCode::from_str(&invite_code)?;
223        let federation_id = invite_code.federation_id();
224
225        // Derive federation-specific secret from wallet mnemonic
226        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
227
228        // Try to consume cached preview, otherwise create new one
229        let cached_preview = self.preview_cache.lock().unwrap().take();
230        let preview = match cached_preview {
231            Some(preview) if preview.config().calculate_federation_id() == federation_id => preview,
232            _ => {
233                let builder = Self::client_builder().await?;
234                builder
235                    .preview(self.connectors.clone(), &invite_code)
236                    .await?
237            }
238        };
239
240        // Check if backup exists
241        #[allow(deprecated)]
242        let backup = preview
243            .download_backup_from_federation(RootSecret::StandardDoubleDerive(
244                federation_secret.clone(),
245            ))
246            .await?;
247
248        let client = if force_recover || backup.is_some() {
249            Arc::new(
250                preview
251                    .recover(
252                        client_db,
253                        RootSecret::StandardDoubleDerive(federation_secret),
254                        backup,
255                    )
256                    .await?,
257            )
258        } else {
259            Arc::new(
260                preview
261                    .join(
262                        client_db,
263                        RootSecret::StandardDoubleDerive(federation_secret),
264                    )
265                    .await?,
266            )
267        };
268
269        self.add_client(client_name, client).await;
270        Ok(())
271    }
272
273    async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
274        // Check if wallet mnemonic is set
275        let mnemonic = self
276            .get_mnemonic_from_db()
277            .await?
278            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
279
280        let client_db = self.client_db(client_name.clone()).await?;
281
282        if !fedimint_client::Client::is_initialized(&client_db).await {
283            anyhow::bail!("client is not initialized for this database");
284        }
285
286        // Get the client config to retrieve the federation ID
287        let client_config = fedimint_client::Client::get_config_from_db(&client_db)
288            .await
289            .context("Client config not found in database")?;
290
291        let federation_id = client_config.calculate_federation_id();
292
293        // Derive federation-specific secret from wallet mnemonic
294        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
295
296        let builder = Self::client_builder().await?;
297        let client = Arc::new(
298            builder
299                .open(
300                    self.connectors.clone(),
301                    client_db,
302                    RootSecret::StandardDoubleDerive(federation_secret),
303                )
304                .await?,
305        );
306
307        self.add_client(client_name, client).await;
308        Ok(())
309    }
310
311    async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
312        let mut clients = self.clients.lock().await;
313        let mut client = clients.remove(&client_name).context("client not found")?;
314
315        // RPC calls might have cloned the client Arc before we remove the client.
316        for attempt in 0.. {
317            info!(attempt, "waiting for RPCs to drop the federation object");
318            match Arc::try_unwrap(client) {
319                Ok(client) => {
320                    client.shutdown().await;
321                    break;
322                }
323                Err(client_val) => client = client_val,
324            }
325            fedimint_core::task::sleep(Duration::from_millis(100)).await;
326        }
327        Ok(())
328    }
329
330    fn handle_client_rpc(
331        self: Arc<Self>,
332        client_name: String,
333        module: String,
334        method: String,
335        payload: serde_json::Value,
336    ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
337        Box::pin(try_stream! {
338            let client = self
339                .get_client(&client_name)
340                .await
341                .with_context(|| format!("Client not found: {client_name}"))?;
342            match module.as_str() {
343                "" => {
344                    let mut stream = client.handle_global_rpc(method, payload);
345                    while let Some(item) = stream.next().await {
346                        yield item?;
347                    }
348                }
349                "ln" => {
350                    let ln = client.get_first_module::<LightningClientModule>()?.inner();
351                    let mut stream = ln.handle_rpc(method, payload).await;
352                    while let Some(item) = stream.next().await {
353                        yield item?;
354                    }
355                }
356                "mint" => {
357                    let mint = client.get_first_module::<MintClientModule>()?.inner();
358                    let mut stream = mint.handle_rpc(method, payload).await;
359                    while let Some(item) = stream.next().await {
360                        yield item?;
361                    }
362                }
363                "wallet" => {
364                    let wallet = client
365                        .get_first_module::<WalletClientModule>()?
366                        .inner();
367                    let mut stream = wallet.handle_rpc(method, payload).await;
368                    while let Some(item) = stream.next().await {
369                        yield item?;
370                    }
371                }
372                "meta" => {
373                    let meta = client.get_first_module::<MetaClientModule>()?.inner();
374                    let mut stream = meta.handle_rpc(method, payload).await;
375                    while let Some(item) = stream.next().await {
376                        yield item?;
377                    }
378                }
379                _ => {
380                    Err(anyhow::format_err!("module not found: {module}"))?;
381                },
382            };
383        })
384    }
385
386    fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
387        let invite_code = InviteCode::from_str(&invite_code)?;
388
389        Ok(json!({
390            "url": invite_code.url(),
391            "federation_id": invite_code.federation_id(),
392        }))
393    }
394
395    fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
396        let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
397            .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
398
399        let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
400        let amount_sat = amount_msat as f64 / 1000.0;
401
402        let expiry_seconds = invoice.expiry_time().as_secs();
403
404        // memo
405        let description = match invoice.description() {
406            Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
407            Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
408        };
409
410        Ok(json!({
411            "amount": amount_sat,
412            "expiry": expiry_seconds,
413            "memo": description,
414        }))
415    }
416
417    async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
418        let invite = InviteCode::from_str(&invite_code)?;
419        let federation_id = invite.federation_id();
420
421        let builder = Self::client_builder().await?;
422        let preview = builder.preview(self.connectors.clone(), &invite).await?;
423
424        let json_config = preview.config().to_json();
425        // Store in cache
426        *self.preview_cache.lock().unwrap() = Some(preview);
427
428        Ok(json!({
429            "config": json_config,
430            "federation_id": federation_id.to_string(),
431        }))
432    }
433
434    fn handle_rpc_inner(
435        self: Arc<Self>,
436        request: RpcRequest,
437    ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
438        match request.kind {
439            RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
440                self.set_mnemonic(words).await?;
441                yield serde_json::json!({ "success": true });
442            })),
443            RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
444                let words = self.generate_mnemonic().await?;
445                yield serde_json::json!({ "mnemonic": words });
446            })),
447            RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
448                let words = self.get_mnemonic_words().await?;
449                yield serde_json::json!({ "mnemonic": words });
450            })),
451            RpcRequestKind::HasMnemonicSet => Some(Box::pin(try_stream! {
452                let is_set = self.has_mnemonic_set().await?;
453                yield serde_json::json!(is_set);
454            })),
455            RpcRequestKind::JoinFederation {
456                invite_code,
457                client_name,
458                force_recover,
459            } => Some(Box::pin(try_stream! {
460                self.handle_join_federation(invite_code, client_name, force_recover)
461                    .await?;
462                yield serde_json::json!(null);
463            })),
464            RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
465                self.handle_open_client(client_name).await?;
466                yield serde_json::json!(null);
467            })),
468            RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
469                self.handle_close_client(client_name).await?;
470                yield serde_json::json!(null);
471            })),
472            RpcRequestKind::ClientRpc {
473                client_name,
474                module,
475                method,
476                payload,
477            } => Some(self.handle_client_rpc(client_name, module, method, payload)),
478            RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
479                let result = self.parse_invite_code(invite_code)?;
480                yield result;
481            })),
482            RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
483                let result = self.parse_bolt11_invoice(invoice)?;
484                yield result;
485            })),
486            RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
487                let result = self.preview_federation(invite_code).await?;
488                yield result;
489            })),
490            RpcRequestKind::ParseOobNotes { oob_notes } => Some(Box::pin(try_stream! {
491                let parsed = parse_oob_notes(&oob_notes)?;
492                yield serde_json::to_value(parsed)?;
493            })),
494            RpcRequestKind::ParseLightningAddress { address } => Some(Box::pin(try_stream! {
495                let url = fedimint_lnurl::parse_address(&address)
496                    .context("Invalid Lightning Address")?;
497                let metadata = fedimint_lnurl::request(&url).await
498                    .map_err(|e| anyhow::anyhow!(e))?;
499
500                yield serde_json::to_value(metadata)?;
501            })),
502            RpcRequestKind::CancelRpc { cancel_request_id } => {
503                if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
504                    handle.abort();
505                }
506                None
507            }
508        }
509    }
510
511    pub fn handle_rpc(
512        self: Arc<Self>,
513        request: RpcRequest,
514        handler: impl RpcResponseHandler + 'static,
515    ) -> HandledRpc<'static> {
516        let request_id = request.request_id;
517
518        let Some(stream) = self.clone().handle_rpc_inner(request) else {
519            return HandledRpc { task: None };
520        };
521
522        let (abort_handle, abort_registration) = AbortHandle::new_pair();
523        self.add_rpc_handle(request_id, abort_handle);
524
525        let task = Box::pin(async move {
526            let mut stream = Abortable::new(stream, abort_registration);
527
528            while let Some(result) = stream.next().await {
529                let response = match result {
530                    Ok(value) => RpcResponse {
531                        request_id,
532                        kind: RpcResponseKind::Data { data: value },
533                    },
534                    Err(e) => RpcResponse {
535                        request_id,
536                        kind: RpcResponseKind::Error {
537                            error: e.to_string(),
538                        },
539                    },
540                };
541                handler.handle_response(response);
542            }
543
544            // Clean up abort handle and send end message
545            let _ = self.remove_rpc_handle(request_id);
546            handler.handle_response(RpcResponse {
547                request_id,
548                kind: if stream.is_aborted() {
549                    RpcResponseKind::Aborted {}
550                } else {
551                    RpcResponseKind::End {}
552                },
553            });
554        });
555
556        HandledRpc { task: Some(task) }
557    }
558
559    /// Retrieve the wallet-level mnemonic words.
560    /// Returns the mnemonic as a vector of words, or None if no mnemonic is
561    /// set.
562    async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
563        let mnemonic = self.get_mnemonic_from_db().await?;
564
565        if let Some(mnemonic) = mnemonic {
566            let words = mnemonic.words().map(|w| w.to_string()).collect();
567            Ok(Some(words))
568        } else {
569            Ok(None)
570        }
571    }
572    /// Set a mnemonic from user-provided words
573    /// Returns an error if a mnemonic is already set
574    async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
575        let all_words = words.join(" ");
576        let mnemonic =
577            Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
578
579        let mut dbtx = self.unified_database.begin_transaction().await;
580
581        if dbtx.get_value(&MnemonicKey).await.is_some() {
582            anyhow::bail!(
583                "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
584            );
585        }
586
587        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
588            .await;
589
590        dbtx.commit_tx().await;
591
592        Ok(())
593    }
594
595    /// Generate a new random mnemonic and set it
596    /// Returns an error if a mnemonic is already set
597    async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
598        let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
599        let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
600
601        let mut dbtx = self.unified_database.begin_transaction().await;
602
603        if dbtx.get_value(&MnemonicKey).await.is_some() {
604            anyhow::bail!(
605                "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
606            );
607        }
608
609        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
610            .await;
611
612        dbtx.commit_tx().await;
613
614        Ok(words)
615    }
616
617    /// Derive federation-specific secret from wallet mnemonic
618    fn derive_federation_secret(
619        &self,
620        mnemonic: &Mnemonic,
621        federation_id: &FederationId,
622    ) -> DerivableSecret {
623        let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
624        let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
625        let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
626        let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
627        federation_wallet_root_secret.child_key(ChildId(0))
628    }
629
630    /// Fetch mnemonic from database
631    async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
632        let mut dbtx = self.unified_database.begin_transaction_nc().await;
633
634        if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
635            let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
636            Ok(Some(mnemonic))
637        } else {
638            Ok(None)
639        }
640    }
641
642    /// Check if mnemonic is set
643    async fn has_mnemonic_set(&self) -> anyhow::Result<bool> {
644        let mnemonic = self.get_mnemonic_from_db().await?;
645        Ok(mnemonic.is_some())
646    }
647}
648
649pub fn parse_oob_notes(oob_notes_str: &str) -> anyhow::Result<ParsedNoteDetails> {
650    let oob_notes =
651        OOBNotes::from_str(oob_notes_str).context("Failed to parse OOB notes string")?;
652
653    let total_amount = oob_notes.total_amount();
654    let federation_id_prefix = oob_notes.federation_id_prefix();
655    let invite_code = oob_notes.federation_invite();
656    let federation_id = invite_code.as_ref().map(|inv| inv.federation_id());
657
658    // Get note counts by denomination
659    let notes = oob_notes.notes();
660    let mut note_counts = TieredCounts::default();
661    for (amount, _note) in notes.iter_items() {
662        note_counts.inc(amount, 1);
663    }
664
665    Ok(ParsedNoteDetails {
666        total_amount,
667        federation_id_prefix,
668        federation_id,
669        invite_code,
670        note_counts,
671    })
672}