fedimint_client_rpc/
lib.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use async_stream::try_stream;
8use fedimint_bip39::{Bip39RootSecretStrategy, Mnemonic};
9use fedimint_client::module::ClientModule;
10use fedimint_client::secret::RootSecretStrategy;
11use fedimint_client::{ClientHandleArc, ClientPreview, RootSecret};
12use fedimint_connectors::ConnectorRegistry;
13use fedimint_core::config::{FederationId, FederationIdPrefix};
14use fedimint_core::db::{Database, IDatabaseTransactionOpsCoreTyped};
15use fedimint_core::encoding::{Decodable, Encodable};
16use fedimint_core::invite_code::InviteCode;
17use fedimint_core::task::{MaybeSend, MaybeSync};
18use fedimint_core::util::{BoxFuture, BoxStream};
19use fedimint_core::{Amount, TieredCounts, impl_db_record};
20use fedimint_derive_secret::{ChildId, DerivableSecret};
21use fedimint_ln_client::{LightningClientInit, LightningClientModule};
22use fedimint_meta_client::MetaClientInit;
23use fedimint_mint_client::{MintClientInit, MintClientModule, OOBNotes};
24use fedimint_wallet_client::{WalletClientInit, WalletClientModule};
25use futures::StreamExt;
26use futures::future::{AbortHandle, Abortable};
27use lightning_invoice::Bolt11InvoiceDescriptionRef;
28use rand::thread_rng;
29use serde::{Deserialize, Serialize};
30use serde_json::json;
31use tokio::sync::Mutex;
32use tracing::info;
33
34// Key prefixes for the unified database
35#[repr(u8)]
36#[derive(Clone, Copy, Debug)]
37pub enum DbKeyPrefix {
38    ClientDatabase = 0x00,
39    Mnemonic = 0x01,
40}
41
42#[derive(Debug, Clone, Encodable, Decodable, Eq, PartialEq, Hash)]
43pub struct MnemonicKey;
44
45impl_db_record!(
46    key = MnemonicKey,
47    value = Vec<u8>,
48    db_prefix = DbKeyPrefix::Mnemonic,
49);
50
51/// Parsed details from an OOB note.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ParsedNoteDetails {
54    /// Total amount of all notes in the OOB notes
55    pub total_amount: Amount,
56    /// Federation ID prefix (always present)
57    pub federation_id_prefix: FederationIdPrefix,
58    /// Full federation ID (if invite is present)
59    pub federation_id: Option<FederationId>,
60    /// Invite code to join the federation (if present)
61    pub invite_code: Option<InviteCode>,
62    /// Number of notes per denomination
63    pub note_counts: TieredCounts,
64}
65
66#[derive(Serialize, Deserialize)]
67#[serde(rename_all = "snake_case")]
68pub struct RpcRequest {
69    pub request_id: u64,
70    #[serde(flatten)]
71    pub kind: RpcRequestKind,
72}
73
74#[derive(Serialize, Deserialize)]
75#[serde(tag = "type", rename_all = "snake_case")]
76pub enum RpcRequestKind {
77    SetMnemonic {
78        words: Vec<String>,
79    },
80    GenerateMnemonic,
81    GetMnemonic,
82    HasMnemonicSet,
83    /// Join federation (requires mnemonic to be set first)
84    JoinFederation {
85        invite_code: String,
86        force_recover: bool,
87        client_name: String,
88    },
89    OpenClient {
90        client_name: String,
91    },
92    CloseClient {
93        client_name: String,
94    },
95    ClientRpc {
96        client_name: String,
97        module: String,
98        method: String,
99        payload: serde_json::Value,
100    },
101    CancelRpc {
102        cancel_request_id: u64,
103    },
104    ParseInviteCode {
105        invite_code: String,
106    },
107    ParseBolt11Invoice {
108        invoice: String,
109    },
110    PreviewFederation {
111        invite_code: String,
112    },
113    ParseOobNotes {
114        oob_notes: String,
115    },
116}
117
118#[derive(Serialize, Deserialize, Clone, Debug)]
119pub struct RpcResponse {
120    pub request_id: u64,
121    #[serde(flatten)]
122    pub kind: RpcResponseKind,
123}
124
125#[derive(Serialize, Deserialize, Clone, Debug)]
126#[serde(tag = "type", rename_all = "snake_case")]
127pub enum RpcResponseKind {
128    Data { data: serde_json::Value },
129    Error { error: String },
130    Aborted {},
131    End {},
132}
133
134pub trait RpcResponseHandler: MaybeSend + MaybeSync {
135    fn handle_response(&self, response: RpcResponse);
136}
137
138pub struct RpcGlobalState {
139    /// Endpoints used for all global-state functionality
140    connectors: ConnectorRegistry,
141    clients: Mutex<HashMap<String, ClientHandleArc>>,
142    rpc_handles: std::sync::Mutex<HashMap<u64, AbortHandle>>,
143    unified_database: Database,
144    preview_cache: std::sync::Mutex<Option<ClientPreview>>,
145}
146
147pub struct HandledRpc<'a> {
148    pub task: Option<BoxFuture<'a, ()>>,
149}
150
151impl RpcGlobalState {
152    pub fn new(connectors: ConnectorRegistry, unified_database: Database) -> Self {
153        Self {
154            connectors,
155            clients: Mutex::new(HashMap::new()),
156            rpc_handles: std::sync::Mutex::new(HashMap::new()),
157            unified_database,
158            preview_cache: std::sync::Mutex::new(None),
159        }
160    }
161
162    async fn add_client(&self, client_name: String, client: ClientHandleArc) {
163        let mut clients = self.clients.lock().await;
164        clients.insert(client_name, client);
165    }
166
167    async fn get_client(&self, client_name: &str) -> Option<ClientHandleArc> {
168        let clients = self.clients.lock().await;
169        clients.get(client_name).cloned()
170    }
171
172    fn add_rpc_handle(&self, request_id: u64, handle: AbortHandle) {
173        let mut handles = self.rpc_handles.lock().unwrap();
174        if handles.insert(request_id, handle).is_some() {
175            tracing::error!("RPC CLIENT ERROR: request id reuse detected");
176        }
177    }
178
179    fn remove_rpc_handle(&self, request_id: u64) -> Option<AbortHandle> {
180        let mut handles = self.rpc_handles.lock().unwrap();
181        handles.remove(&request_id)
182    }
183
184    async fn client_builder() -> Result<fedimint_client::ClientBuilder, anyhow::Error> {
185        let mut builder = fedimint_client::Client::builder().await?;
186        builder.with_module(MintClientInit);
187        builder.with_module(LightningClientInit::default());
188        builder.with_module(WalletClientInit(None));
189        builder.with_module(MetaClientInit);
190        Ok(builder)
191    }
192
193    /// Get client-specific database with proper prefix
194    async fn client_db(&self, client_name: String) -> anyhow::Result<Database> {
195        assert_eq!(client_name.len(), 36);
196
197        let unified_db = &self.unified_database;
198        let mut client_prefix = vec![DbKeyPrefix::ClientDatabase as u8];
199        client_prefix.extend_from_slice(client_name.as_bytes());
200        Ok(unified_db.with_prefix(client_prefix))
201    }
202
203    /// Handle joining federation using unified database
204    async fn handle_join_federation(
205        &self,
206
207        invite_code: String,
208        client_name: String,
209        force_recover: bool,
210    ) -> anyhow::Result<()> {
211        // Check if wallet mnemonic is set
212        let mnemonic = self
213            .get_mnemonic_from_db()
214            .await?
215            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
216
217        let client_db = self.client_db(client_name.clone()).await?;
218
219        let invite_code = InviteCode::from_str(&invite_code)?;
220        let federation_id = invite_code.federation_id();
221
222        // Derive federation-specific secret from wallet mnemonic
223        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
224
225        // Try to consume cached preview, otherwise create new one
226        let cached_preview = self.preview_cache.lock().unwrap().take();
227        let preview = match cached_preview {
228            Some(preview) if preview.config().calculate_federation_id() == federation_id => preview,
229            _ => {
230                let builder = Self::client_builder().await?;
231                builder
232                    .preview(self.connectors.clone(), &invite_code)
233                    .await?
234            }
235        };
236
237        // Check if backup exists
238        let backup = preview
239            .download_backup_from_federation(RootSecret::StandardDoubleDerive(
240                federation_secret.clone(),
241            ))
242            .await?;
243
244        let client = if force_recover || backup.is_some() {
245            Arc::new(
246                preview
247                    .recover(
248                        client_db,
249                        RootSecret::StandardDoubleDerive(federation_secret),
250                        backup,
251                    )
252                    .await?,
253            )
254        } else {
255            Arc::new(
256                preview
257                    .join(
258                        client_db,
259                        RootSecret::StandardDoubleDerive(federation_secret),
260                    )
261                    .await?,
262            )
263        };
264
265        self.add_client(client_name, client).await;
266        Ok(())
267    }
268
269    async fn handle_open_client(&self, client_name: String) -> anyhow::Result<()> {
270        // Check if wallet mnemonic is set
271        let mnemonic = self
272            .get_mnemonic_from_db()
273            .await?
274            .context("No wallet mnemonic set. Please set or generate a mnemonic first.")?;
275
276        let client_db = self.client_db(client_name.clone()).await?;
277
278        if !fedimint_client::Client::is_initialized(&client_db).await {
279            anyhow::bail!("client is not initialized for this database");
280        }
281
282        // Get the client config to retrieve the federation ID
283        let client_config = fedimint_client::Client::get_config_from_db(&client_db)
284            .await
285            .context("Client config not found in database")?;
286
287        let federation_id = client_config.calculate_federation_id();
288
289        // Derive federation-specific secret from wallet mnemonic
290        let federation_secret = self.derive_federation_secret(&mnemonic, &federation_id);
291
292        let builder = Self::client_builder().await?;
293        let client = Arc::new(
294            builder
295                .open(
296                    self.connectors.clone(),
297                    client_db,
298                    RootSecret::StandardDoubleDerive(federation_secret),
299                )
300                .await?,
301        );
302
303        self.add_client(client_name, client).await;
304        Ok(())
305    }
306
307    async fn handle_close_client(&self, client_name: String) -> anyhow::Result<()> {
308        let mut clients = self.clients.lock().await;
309        let mut client = clients.remove(&client_name).context("client not found")?;
310
311        // RPC calls might have cloned the client Arc before we remove the client.
312        for attempt in 0.. {
313            info!(attempt, "waiting for RPCs to drop the federation object");
314            match Arc::try_unwrap(client) {
315                Ok(client) => {
316                    client.shutdown().await;
317                    break;
318                }
319                Err(client_val) => client = client_val,
320            }
321            fedimint_core::task::sleep(Duration::from_millis(100)).await;
322        }
323        Ok(())
324    }
325
326    fn handle_client_rpc(
327        self: Arc<Self>,
328        client_name: String,
329        module: String,
330        method: String,
331        payload: serde_json::Value,
332    ) -> BoxStream<'static, anyhow::Result<serde_json::Value>> {
333        Box::pin(try_stream! {
334            let client = self
335                .get_client(&client_name)
336                .await
337                .with_context(|| format!("Client not found: {client_name}"))?;
338            match module.as_str() {
339                "" => {
340                    let mut stream = client.handle_global_rpc(method, payload);
341                    while let Some(item) = stream.next().await {
342                        yield item?;
343                    }
344                }
345                "ln" => {
346                    let ln = client.get_first_module::<LightningClientModule>()?.inner();
347                    let mut stream = ln.handle_rpc(method, payload).await;
348                    while let Some(item) = stream.next().await {
349                        yield item?;
350                    }
351                }
352                "mint" => {
353                    let mint = client.get_first_module::<MintClientModule>()?.inner();
354                    let mut stream = mint.handle_rpc(method, payload).await;
355                    while let Some(item) = stream.next().await {
356                        yield item?;
357                    }
358                }
359                "wallet" => {
360                    let wallet = client
361                        .get_first_module::<WalletClientModule>()?
362                        .inner();
363                    let mut stream = wallet.handle_rpc(method, payload).await;
364                    while let Some(item) = stream.next().await {
365                        yield item?;
366                    }
367                }
368                _ => {
369                    Err(anyhow::format_err!("module not found: {module}"))?;
370                },
371            };
372        })
373    }
374
375    fn parse_invite_code(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
376        let invite_code = InviteCode::from_str(&invite_code)?;
377
378        Ok(json!({
379            "url": invite_code.url(),
380            "federation_id": invite_code.federation_id(),
381        }))
382    }
383
384    fn parse_bolt11_invoice(&self, invoice_str: String) -> anyhow::Result<serde_json::Value> {
385        let invoice = lightning_invoice::Bolt11Invoice::from_str(&invoice_str)
386            .map_err(|e| anyhow::anyhow!("Failed to parse Lightning invoice: {}", e))?;
387
388        let amount_msat = invoice.amount_milli_satoshis().unwrap_or(0);
389        let amount_sat = amount_msat as f64 / 1000.0;
390
391        let expiry_seconds = invoice.expiry_time().as_secs();
392
393        // memo
394        let description = match invoice.description() {
395            Bolt11InvoiceDescriptionRef::Direct(desc) => desc.to_string(),
396            Bolt11InvoiceDescriptionRef::Hash(_) => "Description hash only".to_string(),
397        };
398
399        Ok(json!({
400            "amount": amount_sat,
401            "expiry": expiry_seconds,
402            "memo": description,
403        }))
404    }
405
406    async fn preview_federation(&self, invite_code: String) -> anyhow::Result<serde_json::Value> {
407        let invite = InviteCode::from_str(&invite_code)?;
408        let federation_id = invite.federation_id();
409
410        let builder = Self::client_builder().await?;
411        let preview = builder.preview(self.connectors.clone(), &invite).await?;
412
413        let json_config = preview.config().to_json();
414        // Store in cache
415        *self.preview_cache.lock().unwrap() = Some(preview);
416
417        Ok(json!({
418            "config": json_config,
419            "federation_id": federation_id.to_string(),
420        }))
421    }
422
423    fn handle_rpc_inner(
424        self: Arc<Self>,
425        request: RpcRequest,
426    ) -> Option<BoxStream<'static, anyhow::Result<serde_json::Value>>> {
427        match request.kind {
428            RpcRequestKind::SetMnemonic { words } => Some(Box::pin(try_stream! {
429                self.set_mnemonic(words).await?;
430                yield serde_json::json!({ "success": true });
431            })),
432            RpcRequestKind::GenerateMnemonic => Some(Box::pin(try_stream! {
433                let words = self.generate_mnemonic().await?;
434                yield serde_json::json!({ "mnemonic": words });
435            })),
436            RpcRequestKind::GetMnemonic => Some(Box::pin(try_stream! {
437                let words = self.get_mnemonic_words().await?;
438                yield serde_json::json!({ "mnemonic": words });
439            })),
440            RpcRequestKind::HasMnemonicSet => Some(Box::pin(try_stream! {
441                let is_set = self.has_mnemonic_set().await?;
442                yield serde_json::json!(is_set);
443            })),
444            RpcRequestKind::JoinFederation {
445                invite_code,
446                client_name,
447                force_recover,
448            } => Some(Box::pin(try_stream! {
449                self.handle_join_federation(invite_code, client_name, force_recover)
450                    .await?;
451                yield serde_json::json!(null);
452            })),
453            RpcRequestKind::OpenClient { client_name } => Some(Box::pin(try_stream! {
454                self.handle_open_client(client_name).await?;
455                yield serde_json::json!(null);
456            })),
457            RpcRequestKind::CloseClient { client_name } => Some(Box::pin(try_stream! {
458                self.handle_close_client(client_name).await?;
459                yield serde_json::json!(null);
460            })),
461            RpcRequestKind::ClientRpc {
462                client_name,
463                module,
464                method,
465                payload,
466            } => Some(self.handle_client_rpc(client_name, module, method, payload)),
467            RpcRequestKind::ParseInviteCode { invite_code } => Some(Box::pin(try_stream! {
468                let result = self.parse_invite_code(invite_code)?;
469                yield result;
470            })),
471            RpcRequestKind::ParseBolt11Invoice { invoice } => Some(Box::pin(try_stream! {
472                let result = self.parse_bolt11_invoice(invoice)?;
473                yield result;
474            })),
475            RpcRequestKind::PreviewFederation { invite_code } => Some(Box::pin(try_stream! {
476                let result = self.preview_federation(invite_code).await?;
477                yield result;
478            })),
479            RpcRequestKind::ParseOobNotes { oob_notes } => Some(Box::pin(try_stream! {
480                let parsed = parse_oob_notes(&oob_notes)?;
481                yield serde_json::to_value(parsed)?;
482            })),
483            RpcRequestKind::CancelRpc { cancel_request_id } => {
484                if let Some(handle) = self.remove_rpc_handle(cancel_request_id) {
485                    handle.abort();
486                }
487                None
488            }
489        }
490    }
491
492    pub fn handle_rpc(
493        self: Arc<Self>,
494        request: RpcRequest,
495        handler: impl RpcResponseHandler + 'static,
496    ) -> HandledRpc<'static> {
497        let request_id = request.request_id;
498
499        let Some(stream) = self.clone().handle_rpc_inner(request) else {
500            return HandledRpc { task: None };
501        };
502
503        let (abort_handle, abort_registration) = AbortHandle::new_pair();
504        self.add_rpc_handle(request_id, abort_handle);
505
506        let task = Box::pin(async move {
507            let mut stream = Abortable::new(stream, abort_registration);
508
509            while let Some(result) = stream.next().await {
510                let response = match result {
511                    Ok(value) => RpcResponse {
512                        request_id,
513                        kind: RpcResponseKind::Data { data: value },
514                    },
515                    Err(e) => RpcResponse {
516                        request_id,
517                        kind: RpcResponseKind::Error {
518                            error: e.to_string(),
519                        },
520                    },
521                };
522                handler.handle_response(response);
523            }
524
525            // Clean up abort handle and send end message
526            let _ = self.remove_rpc_handle(request_id);
527            handler.handle_response(RpcResponse {
528                request_id,
529                kind: if stream.is_aborted() {
530                    RpcResponseKind::Aborted {}
531                } else {
532                    RpcResponseKind::End {}
533                },
534            });
535        });
536
537        HandledRpc { task: Some(task) }
538    }
539
540    /// Retrieve the wallet-level mnemonic words.
541    /// Returns the mnemonic as a vector of words, or None if no mnemonic is
542    /// set.
543    async fn get_mnemonic_words(&self) -> anyhow::Result<Option<Vec<String>>> {
544        let mnemonic = self.get_mnemonic_from_db().await?;
545
546        if let Some(mnemonic) = mnemonic {
547            let words = mnemonic.words().map(|w| w.to_string()).collect();
548            Ok(Some(words))
549        } else {
550            Ok(None)
551        }
552    }
553    /// Set a mnemonic from user-provided words
554    /// Returns an error if a mnemonic is already set
555    async fn set_mnemonic(&self, words: Vec<String>) -> anyhow::Result<()> {
556        let all_words = words.join(" ");
557        let mnemonic =
558            Mnemonic::parse_in_normalized(fedimint_bip39::Language::English, &all_words)?;
559
560        let mut dbtx = self.unified_database.begin_transaction().await;
561
562        if dbtx.get_value(&MnemonicKey).await.is_some() {
563            anyhow::bail!(
564                "Wallet mnemonic already exists. Please clear existing data before setting a new mnemonic."
565            );
566        }
567
568        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
569            .await;
570
571        dbtx.commit_tx().await;
572
573        Ok(())
574    }
575
576    /// Generate a new random mnemonic and set it
577    /// Returns an error if a mnemonic is already set
578    async fn generate_mnemonic(&self) -> anyhow::Result<Vec<String>> {
579        let mnemonic = Bip39RootSecretStrategy::<12>::random(&mut thread_rng());
580        let words: Vec<String> = mnemonic.words().map(|w| w.to_string()).collect();
581
582        let mut dbtx = self.unified_database.begin_transaction().await;
583
584        if dbtx.get_value(&MnemonicKey).await.is_some() {
585            anyhow::bail!(
586                "Wallet mnemonic already exists. Please clear existing data before generating a new mnemonic."
587            );
588        }
589
590        dbtx.insert_new_entry(&MnemonicKey, &mnemonic.to_entropy())
591            .await;
592
593        dbtx.commit_tx().await;
594
595        Ok(words)
596    }
597
598    /// Derive federation-specific secret from wallet mnemonic
599    fn derive_federation_secret(
600        &self,
601        mnemonic: &Mnemonic,
602        federation_id: &FederationId,
603    ) -> DerivableSecret {
604        let global_root_secret = Bip39RootSecretStrategy::<12>::to_root_secret(mnemonic);
605        let multi_federation_root_secret = global_root_secret.child_key(ChildId(0));
606        let federation_root_secret = multi_federation_root_secret.federation_key(federation_id);
607        let federation_wallet_root_secret = federation_root_secret.child_key(ChildId(0));
608        federation_wallet_root_secret.child_key(ChildId(0))
609    }
610
611    /// Fetch mnemonic from database
612    async fn get_mnemonic_from_db(&self) -> anyhow::Result<Option<Mnemonic>> {
613        let mut dbtx = self.unified_database.begin_transaction_nc().await;
614
615        if let Some(mnemonic_entropy) = dbtx.get_value(&MnemonicKey).await {
616            let mnemonic = Mnemonic::from_entropy(&mnemonic_entropy)?;
617            Ok(Some(mnemonic))
618        } else {
619            Ok(None)
620        }
621    }
622
623    /// Check if mnemonic is set
624    async fn has_mnemonic_set(&self) -> anyhow::Result<bool> {
625        let mnemonic = self.get_mnemonic_from_db().await?;
626        Ok(mnemonic.is_some())
627    }
628}
629
630pub fn parse_oob_notes(oob_notes_str: &str) -> anyhow::Result<ParsedNoteDetails> {
631    let oob_notes =
632        OOBNotes::from_str(oob_notes_str).context("Failed to parse OOB notes string")?;
633
634    let total_amount = oob_notes.total_amount();
635    let federation_id_prefix = oob_notes.federation_id_prefix();
636    let invite_code = oob_notes.federation_invite();
637    let federation_id = invite_code.as_ref().map(|inv| inv.federation_id());
638
639    // Get note counts by denomination
640    let notes = oob_notes.notes();
641    let mut note_counts = TieredCounts::default();
642    for (amount, _note) in notes.iter_items() {
643        note_counts.inc(amount, 1);
644    }
645
646    Ok(ParsedNoteDetails {
647        total_amount,
648        federation_id_prefix,
649        federation_id,
650        invite_code,
651        note_counts,
652    })
653}