fedimint_client_rpc/
lib.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, ClientPreview, RootSecret};
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::config::{FederationId, FederationIdPrefix};
14use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::{Decodable, Encodable};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_core::{Amount, TieredCounts, impl_db_record};
20use fedimint_derive_secret::{ChildId, DerivableSecret};
21use fedimint_ln_client::{LightningClientInit, LightningClientModule};
22use fedimint_meta_client::MetaClientInit;
23use fedimint_mint_client::{MintClientInit, MintClientModule, OOBNotes};
24use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
25use futures::StreamExt;
26use futures::future::{AbortHandle, Abortable};
27use lightning_invoice::Bolt11InvoiceDescriptionRef;
28use rand::thread_rng;
29use serde::{Deserialize, Serialize};
30use serde_json::json;
31use tokio::sync::Mutex;
32use tracing::info;
33
34// Key prefixes for the unified database
35#[repr(u8)]
36#[derive(Clone, Copy, Debug)]
37pub enum DbKeyPrefix {
38    ClientDatabase = 0x00,
39    Mnemonic = 0x01,
40}
41
42#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
43pub struct MnemonicKey;
44
45impl_db_record!(
46    key = MnemonicKey,
47    value = Vec<u8>,
48    db_prefix = DbKeyPrefix::Mnemonic,
49);
50
51/// Parsed details from an OOB note.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ParsedNoteDetails {
54    /// Total amount of all notes in the OOB notes
55    pub total_amount: Amount,
56    /// Federation ID prefix (always present)
57    pub federation_id_prefix: FederationIdPrefix,
58    /// Full federation ID (if invite is present)
59    pub federation_id: Option<FederationId>,
60    /// Invite code to join the federation (if present)
61    pub invite_code: Option<InviteCode>,
62    /// Number of notes per denomination
63    pub note_counts: TieredCounts,
64}
65
66#[derive(Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub struct RpcRequest {
69    pub request_id: u64,
70    #[serde(flatten)]
71    pub kind: RpcRequestKind,
72}
73
74#[derive(Serialize, Deserialize)]
75#[serde(tag = "type", rename_all = "snake_case")]
76pub enum RpcRequestKind {
77    SetMnemonic {
78        words: Vec<String>,
79    },
80    GenerateMnemonic,
81    GetMnemonic,
82    HasMnemonicSet,
83    /// Join federation (requires mnemonic to be set first)
84    JoinFederation {
85        invite_code: String,
86        force_recover: bool,
87        client_name: String,
88    },
89    OpenClient {
90        client_name: String,
91    },
92    CloseClient {
93        client_name: String,
94    },
95    ClientRpc {
96        client_name: String,
97        module: String,
98        method: String,
99        payload: serde_json::Value,
100    },
101    CancelRpc {
102        cancel_request_id: u64,
103    },
104    ParseInviteCode {
105        invite_code: String,
106    },
107    ParseBolt11Invoice {
108        invoice: String,
109    },
110    PreviewFederation {
111        invite_code: String,
112    },
113    ParseOobNotes {
114        oob_notes: String,
115    },
116}
117
118#[derive(Serialize, Deserialize, Clone, Debug)]
119pub struct RpcResponse {
120    pub request_id: u64,
121    #[serde(flatten)]
122    pub kind: RpcResponseKind,
123}
124
125#[derive(Serialize, Deserialize, Clone, Debug)]
126#[serde(tag = "type", rename_all = "snake_case")]
127pub enum RpcResponseKind {
128    Data { data: serde_json::Value },
129    Error { error: String },
130    Aborted {},
131    End {},
132}
133
134pub trait RpcResponseHandler: MaybeSend + MaybeSync {
135    fn handle_response(&self, response: RpcResponse);
136}
137
138pub struct RpcGlobalState {
139    /// Endpoints used for all global-state functionality
140    connectors: ConnectorRegistry,
141    clients: Mutex<HashMap<String, ClientHandleArc>>,
142    rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
143    unified_database: Database,
144    preview_cache: std::sync::Mutex<Option<ClientPreview>>,
145}
146
147pub struct HandledRpc<'a> {
148    pub task: Option<BoxFuture<'a, ()>>,
149}
150
151impl RpcGlobalState {
152    pub fn new(connectors: ConnectorRegistry, unified_database: Database) -> Self {
153        Self {
154            connectors,
155            clients: Mutex::new(HashMap::new()),
156            rpc_handles: std::sync::Mutex::new(HashMap::new()),
157            unified_database,
158            preview_cache: std::sync::Mutex::new(None),
159        }
160    }
161
162    async fn add_client(&self, client_name: String, client: ClientHandleArc) {
163        let mut clients = self.clients.lock().await;
164        clients.insert(client_name, client);
165    }
166
167    async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
168        let clients = self.clients.lock().await;
169        clients.get(client_name).cloned()
170    }
171
172    fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
173        let mut handles = self.rpc_handles.lock().unwrap();
174        if handles.insert(request_id, handle).is_some() {
175            tracing::error!("RPC CLIENT ERROR: request id reuse detected");
176        }
177    }
178
179    fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
180        let mut handles = self.rpc_handles.lock().unwrap();
181        handles.remove(&request_id)
182    }
183
184    async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
185        let mut builder = fedimint_client::Client::builder().await?;
186        builder.with_module(MintClientInit);
187        builder.with_module(LightningClientInit::default());
188        builder.with_module(WalletClientInit(None));
189        builder.with_module(MetaClientInit);
190        Ok(builder)
191    }
192
193    /// Get client-specific database with proper prefix
194    async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
195        assert_eq!(client_name.len(), 36);
196
197        let unified_db = &self.unified_database;
198        let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
199        client_prefix.extend_from_slice(client_name.as_bytes());
200        Ok(unified_db.with_prefix(client_prefix))
201    }
202
203    /// Handle joining federation using unified database
204    async fn handle_join_federation(
205        &self,
206
207        invite_code: String,
208        client_name: String,
209        force_recover: bool,
210    ) -> anyhow::Result<()> {
211        // Check if wallet mnemonic is set
212        let mnemonic = self
213            .get_mnemonic_from_db()
214            .await?
215            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
216
217        let client_db = self.client_db(client_name.clone()).await?;
218
219        let invite_code = InviteCode::from_str(&invite_code)?;
220        let federation_id = invite_code.federation_id();
221
222        // Derive federation-specific secret from wallet mnemonic
223        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
224
225        // Try to consume cached preview, otherwise create new one
226        let cached_preview = self.preview_cache.lock().unwrap().take();
227        let preview = match cached_preview {
228            Some(preview) if preview.config().calculate_federation_id() == federation_id => preview,
229            _ => {
230                let builder = Self::client_builder().await?;
231                builder
232                    .preview(self.connectors.clone(), &invite_code)
233                    .await?
234            }
235        };
236
237        // Check if backup exists
238        #[allow(deprecated)]
239        let backup = preview
240            .download_backup_from_federation(RootSecret::StandardDoubleDerive(
241                federation_secret.clone(),
242            ))
243            .await?;
244
245        let client = if force_recover || backup.is_some() {
246            Arc::new(
247                preview
248                    .recover(
249                        client_db,
250                        RootSecret::StandardDoubleDerive(federation_secret),
251                        backup,
252                    )
253                    .await?,
254            )
255        } else {
256            Arc::new(
257                preview
258                    .join(
259                        client_db,
260                        RootSecret::StandardDoubleDerive(federation_secret),
261                    )
262                    .await?,
263            )
264        };
265
266        self.add_client(client_name, client).await;
267        Ok(())
268    }
269
270    async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
271        // Check if wallet mnemonic is set
272        let mnemonic = self
273            .get_mnemonic_from_db()
274            .await?
275            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
276
277        let client_db = self.client_db(client_name.clone()).await?;
278
279        if !fedimint_client::Client::is_initialized(&client_db).await {
280            anyhow::bail!("client is not initialized for this database");
281        }
282
283        // Get the client config to retrieve the federation ID
284        let client_config = fedimint_client::Client::get_config_from_db(&client_db)
285            .await
286            .context("Client config not found in database")?;
287
288        let federation_id = client_config.calculate_federation_id();
289
290        // Derive federation-specific secret from wallet mnemonic
291        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
292
293        let builder = Self::client_builder().await?;
294        let client = Arc::new(
295            builder
296                .open(
297                    self.connectors.clone(),
298                    client_db,
299                    RootSecret::StandardDoubleDerive(federation_secret),
300                )
301                .await?,
302        );
303
304        self.add_client(client_name, client).await;
305        Ok(())
306    }
307
308    async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
309        let mut clients = self.clients.lock().await;
310        let mut client = clients.remove(&client_name).context("client not found")?;
311
312        // RPC calls might have cloned the client Arc before we remove the client.
313        for attempt in 0.. {
314            info!(attempt, "waiting for RPCs to drop the federation object");
315            match Arc::try_unwrap(client) {
316                Ok(client) => {
317                    client.shutdown().await;
318                    break;
319                }
320                Err(client_val) => client = client_val,
321            }
322            fedimint_core::task::sleep(Duration::from_millis(100)).await;
323        }
324        Ok(())
325    }
326
327    fn handle_client_rpc(
328        self: Arc<Self>,
329        client_name: String,
330        module: String,
331        method: String,
332        payload: serde_json::Value,
333    ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
334        Box::pin(try_stream! {
335            let client = self
336                .get_client(&client_name)
337                .await
338                .with_context(|| format!("Client not found: {client_name}"))?;
339            match module.as_str() {
340                "" => {
341                    let mut stream = client.handle_global_rpc(method, payload);
342                    while let Some(item) = stream.next().await {
343                        yield item?;
344                    }
345                }
346                "ln" => {
347                    let ln = client.get_first_module::<LightningClientModule>()?.inner();
348                    let mut stream = ln.handle_rpc(method, payload).await;
349                    while let Some(item) = stream.next().await {
350                        yield item?;
351                    }
352                }
353                "mint" => {
354                    let mint = client.get_first_module::<MintClientModule>()?.inner();
355                    let mut stream = mint.handle_rpc(method, payload).await;
356                    while let Some(item) = stream.next().await {
357                        yield item?;
358                    }
359                }
360                "wallet" => {
361                    let wallet = client
362                        .get_first_module::<WalletClientModule>()?
363                        .inner();
364                    let mut stream = wallet.handle_rpc(method, payload).await;
365                    while let Some(item) = stream.next().await {
366                        yield item?;
367                    }
368                }
369                _ => {
370                    Err(anyhow::format_err!("module not found: {module}"))?;
371                },
372            };
373        })
374    }
375
376    fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
377        let invite_code = InviteCode::from_str(&invite_code)?;
378
379        Ok(json!({
380            "url": invite_code.url(),
381            "federation_id": invite_code.federation_id(),
382        }))
383    }
384
385    fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
386        let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
387            .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
388
389        let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
390        let amount_sat = amount_msat as f64 / 1000.0;
391
392        let expiry_seconds = invoice.expiry_time().as_secs();
393
394        // memo
395        let description = match invoice.description() {
396            Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
397            Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
398        };
399
400        Ok(json!({
401            "amount": amount_sat,
402            "expiry": expiry_seconds,
403            "memo": description,
404        }))
405    }
406
407    async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
408        let invite = InviteCode::from_str(&invite_code)?;
409        let federation_id = invite.federation_id();
410
411        let builder = Self::client_builder().await?;
412        let preview = builder.preview(self.connectors.clone(), &invite).await?;
413
414        let json_config = preview.config().to_json();
415        // Store in cache
416        *self.preview_cache.lock().unwrap() = Some(preview);
417
418        Ok(json!({
419            "config": json_config,
420            "federation_id": federation_id.to_string(),
421        }))
422    }
423
424    fn handle_rpc_inner(
425        self: Arc<Self>,
426        request: RpcRequest,
427    ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
428        match request.kind {
429            RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
430                self.set_mnemonic(words).await?;
431                yield serde_json::json!({ "success": true });
432            })),
433            RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
434                let words = self.generate_mnemonic().await?;
435                yield serde_json::json!({ "mnemonic": words });
436            })),
437            RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
438                let words = self.get_mnemonic_words().await?;
439                yield serde_json::json!({ "mnemonic": words });
440            })),
441            RpcRequestKind::HasMnemonicSet => Some(Box::pin(try_stream! {
442                let is_set = self.has_mnemonic_set().await?;
443                yield serde_json::json!(is_set);
444            })),
445            RpcRequestKind::JoinFederation {
446                invite_code,
447                client_name,
448                force_recover,
449            } => Some(Box::pin(try_stream! {
450                self.handle_join_federation(invite_code, client_name, force_recover)
451                    .await?;
452                yield serde_json::json!(null);
453            })),
454            RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
455                self.handle_open_client(client_name).await?;
456                yield serde_json::json!(null);
457            })),
458            RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
459                self.handle_close_client(client_name).await?;
460                yield serde_json::json!(null);
461            })),
462            RpcRequestKind::ClientRpc {
463                client_name,
464                module,
465                method,
466                payload,
467            } => Some(self.handle_client_rpc(client_name, module, method, payload)),
468            RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
469                let result = self.parse_invite_code(invite_code)?;
470                yield result;
471            })),
472            RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
473                let result = self.parse_bolt11_invoice(invoice)?;
474                yield result;
475            })),
476            RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
477                let result = self.preview_federation(invite_code).await?;
478                yield result;
479            })),
480            RpcRequestKind::ParseOobNotes { oob_notes } => Some(Box::pin(try_stream! {
481                let parsed = parse_oob_notes(&oob_notes)?;
482                yield serde_json::to_value(parsed)?;
483            })),
484            RpcRequestKind::CancelRpc { cancel_request_id } => {
485                if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
486                    handle.abort();
487                }
488                None
489            }
490        }
491    }
492
493    pub fn handle_rpc(
494        self: Arc<Self>,
495        request: RpcRequest,
496        handler: impl RpcResponseHandler + 'static,
497    ) -> HandledRpc<'static> {
498        let request_id = request.request_id;
499
500        let Some(stream) = self.clone().handle_rpc_inner(request) else {
501            return HandledRpc { task: None };
502        };
503
504        let (abort_handle, abort_registration) = AbortHandle::new_pair();
505        self.add_rpc_handle(request_id, abort_handle);
506
507        let task = Box::pin(async move {
508            let mut stream = Abortable::new(stream, abort_registration);
509
510            while let Some(result) = stream.next().await {
511                let response = match result {
512                    Ok(value) => RpcResponse {
513                        request_id,
514                        kind: RpcResponseKind::Data { data: value },
515                    },
516                    Err(e) => RpcResponse {
517                        request_id,
518                        kind: RpcResponseKind::Error {
519                            error: e.to_string(),
520                        },
521                    },
522                };
523                handler.handle_response(response);
524            }
525
526            // Clean up abort handle and send end message
527            let _ = self.remove_rpc_handle(request_id);
528            handler.handle_response(RpcResponse {
529                request_id,
530                kind: if stream.is_aborted() {
531                    RpcResponseKind::Aborted {}
532                } else {
533                    RpcResponseKind::End {}
534                },
535            });
536        });
537
538        HandledRpc { task: Some(task) }
539    }
540
541    /// Retrieve the wallet-level mnemonic words.
542    /// Returns the mnemonic as a vector of words, or None if no mnemonic is
543    /// set.
544    async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
545        let mnemonic = self.get_mnemonic_from_db().await?;
546
547        if let Some(mnemonic) = mnemonic {
548            let words = mnemonic.words().map(|w| w.to_string()).collect();
549            Ok(Some(words))
550        } else {
551            Ok(None)
552        }
553    }
554    /// Set a mnemonic from user-provided words
555    /// Returns an error if a mnemonic is already set
556    async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
557        let all_words = words.join(" ");
558        let mnemonic =
559            Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
560
561        let mut dbtx = self.unified_database.begin_transaction().await;
562
563        if dbtx.get_value(&MnemonicKey).await.is_some() {
564            anyhow::bail!(
565                "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
566            );
567        }
568
569        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
570            .await;
571
572        dbtx.commit_tx().await;
573
574        Ok(())
575    }
576
577    /// Generate a new random mnemonic and set it
578    /// Returns an error if a mnemonic is already set
579    async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
580        let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
581        let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
582
583        let mut dbtx = self.unified_database.begin_transaction().await;
584
585        if dbtx.get_value(&MnemonicKey).await.is_some() {
586            anyhow::bail!(
587                "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
588            );
589        }
590
591        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
592            .await;
593
594        dbtx.commit_tx().await;
595
596        Ok(words)
597    }
598
599    /// Derive federation-specific secret from wallet mnemonic
600    fn derive_federation_secret(
601        &self,
602        mnemonic: &Mnemonic,
603        federation_id: &FederationId,
604    ) -> DerivableSecret {
605        let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
606        let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
607        let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
608        let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
609        federation_wallet_root_secret.child_key(ChildId(0))
610    }
611
612    /// Fetch mnemonic from database
613    async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
614        let mut dbtx = self.unified_database.begin_transaction_nc().await;
615
616        if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
617            let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
618            Ok(Some(mnemonic))
619        } else {
620            Ok(None)
621        }
622    }
623
624    /// Check if mnemonic is set
625    async fn has_mnemonic_set(&self) -> anyhow::Result<bool> {
626        let mnemonic = self.get_mnemonic_from_db().await?;
627        Ok(mnemonic.is_some())
628    }
629}
630
631pub fn parse_oob_notes(oob_notes_str: &str) -> anyhow::Result<ParsedNoteDetails> {
632    let oob_notes =
633        OOBNotes::from_str(oob_notes_str).context("Failed to parse OOB notes string")?;
634
635    let total_amount = oob_notes.total_amount();
636    let federation_id_prefix = oob_notes.federation_id_prefix();
637    let invite_code = oob_notes.federation_invite();
638    let federation_id = invite_code.as_ref().map(|inv| inv.federation_id());
639
640    // Get note counts by denomination
641    let notes = oob_notes.notes();
642    let mut note_counts = TieredCounts::default();
643    for (amount, _note) in notes.iter_items() {
644        note_counts.inc(amount, 1);
645    }
646
647    Ok(ParsedNoteDetails {
648        total_amount,
649        federation_id_prefix,
650        federation_id,
651        invite_code,
652        note_counts,
653    })
654}