fedimint_recurringd/
db.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2
3use fedimint_core::config::FederationId;
4use fedimint_core::core::OperationId;
5use fedimint_core::db::{
6    AutocommitError, Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped,
7};
8use fedimint_core::encoding::{Decodable, Encodable};
9use fedimint_core::secp256k1::SECP256K1;
10use fedimint_core::secp256k1::rand::thread_rng;
11use fedimint_core::util::BoxFuture;
12use fedimint_core::{Amount, impl_db_lookup, impl_db_record};
13use fedimint_ln_client::recurring::{PaymentCodeId, PaymentCodeRootKey, RecurringPaymentProtocol};
14use fedimint_ln_client::tweak_user_key;
15use futures::stream::StreamExt;
16use lightning_invoice::{Bolt11InvoiceDescription, Description};
17use rand::Rng;
18
19use crate::{
20    LnClientContextExt, PaymentCodeInvoice, RecurringInvoiceServer, operation_id_from_user_key,
21};
22
23#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, Ord, PartialOrd, Encodable, Decodable)]
24pub struct FederationDbPrefix([u8; 16]);
25
26impl FederationDbPrefix {
27    pub fn random() -> FederationDbPrefix {
28        FederationDbPrefix(thread_rng().r#gen())
29    }
30
31    fn prepend(&self, byte: u8) -> Vec<u8> {
32        let mut full_prefix = Vec::with_capacity(17);
33        full_prefix.push(byte);
34        full_prefix.extend(&self.0);
35        full_prefix
36    }
37}
38
39async fn load_federation_clients(db: &Database) -> Vec<(FederationId, FederationDbPrefix)> {
40    let mut dbtx = db.begin_transaction_nc().await;
41    dbtx.find_by_prefix(&FederationClientPrefix)
42        .await
43        .map(|(k, v)| (k.federation_id, v.db_prefix))
44        .collect::<Vec<_>>()
45        .await
46}
47
48pub fn open_client_db(db: &Database, db_prefix: FederationDbPrefix) -> Database {
49    db.with_prefix(db_prefix.prepend(DbKeyPrefix::ClientDB as u8))
50}
51
52pub async fn try_add_federation_database(
53    db: &Database,
54    federation_id: FederationId,
55    db_prefix: FederationDbPrefix,
56) -> Result<(), FederationDbPrefix> {
57    db.autocommit(
58        |dbtx, _| {
59            Box::pin(async move {
60                if let Some(federation_db_entry) =
61                    dbtx.get_value(&FederationClientKey { federation_id }).await
62                {
63                    return Err(federation_db_entry.db_prefix);
64                }
65
66                dbtx.insert_new_entry(
67                    &FederationClientKey { federation_id },
68                    &FederationClientEntry { db_prefix },
69                )
70                .await;
71
72                Ok(())
73            })
74        },
75        None,
76    )
77    .await
78    .map_err(|e| match e {
79        AutocommitError::CommitFailed { .. } => unreachable!("will keep retrying"),
80        AutocommitError::ClosureError { error, .. } => {
81            // TODO: clean up DB once parallel joins are enabled
82            error
83        }
84    })
85}
86
87pub async fn load_federation_client_databases(db: &Database) -> HashMap<FederationId, Database> {
88    load_federation_clients(db)
89        .await
90        .into_iter()
91        .map(|(federation_id, db_prefix)| (federation_id, open_client_db(db, db_prefix)))
92        .collect()
93}
94
95#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
96enum DbKeyPrefix {
97    ClientList = 0x00,
98    ClientDB = 0x01,
99    PaymentCodes = 0x02,
100    PaymentCodeNextInvoiceIndex = 0x03,
101    PaymentCodeInvoices = 0x04,
102
103    SchemaVersion = 0xff,
104}
105
106#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
107pub struct FederationClientKey {
108    pub federation_id: FederationId,
109}
110
111#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
112pub struct FederationClientPrefix;
113
114impl_db_record!(
115    key = FederationClientKey,
116    value = FederationClientEntry,
117    db_prefix = DbKeyPrefix::ClientList,
118);
119impl_db_lookup!(
120    key = FederationClientKey,
121    query_prefix = FederationClientPrefix
122);
123
124#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
125pub struct FederationClientEntry {
126    pub db_prefix: FederationDbPrefix,
127}
128
129#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
130pub struct PaymentCodeKey {
131    pub payment_code_id: PaymentCodeId,
132}
133
134#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
135pub struct PaymentCodePrefix;
136
137#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
138pub enum PaymentCodeVariant {
139    Lnurl { meta: String },
140}
141
142#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
143pub struct PaymentCodeEntry {
144    pub root_key: PaymentCodeRootKey,
145    pub federation_id: FederationId,
146    pub protocol: RecurringPaymentProtocol,
147    pub payment_code: String,
148    pub variant: PaymentCodeVariant,
149}
150
151impl_db_record!(
152    key = PaymentCodeKey,
153    value = PaymentCodeEntry,
154    db_prefix = DbKeyPrefix::PaymentCodes,
155);
156impl_db_lookup!(key = PaymentCodeKey, query_prefix = PaymentCodePrefix);
157
158#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
159pub struct PaymentCodeNextInvoiceIndexKey {
160    pub payment_code_id: PaymentCodeId,
161}
162
163#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
164pub struct PaymentCodeNextInvoiceIndexKeyPrefix;
165
166impl_db_record!(
167    key = PaymentCodeNextInvoiceIndexKey,
168    value = u64,
169    db_prefix = DbKeyPrefix::PaymentCodeNextInvoiceIndex,
170);
171impl_db_lookup!(
172    key = PaymentCodeNextInvoiceIndexKey,
173    query_prefix = PaymentCodeNextInvoiceIndexKeyPrefix
174);
175
176#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
177pub struct PaymentCodeInvoiceKey {
178    pub payment_code_id: PaymentCodeId,
179    pub index: u64,
180}
181
182#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
183pub struct PaymentCodeInvoicePrefix {
184    payment_code_id: PaymentCodeId,
185}
186
187#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
188pub struct PaymentCodeInvoiceEntry {
189    pub operation_id: OperationId,
190    pub invoice: PaymentCodeInvoice,
191}
192
193impl_db_record!(
194    key = PaymentCodeInvoiceKey,
195    value = PaymentCodeInvoiceEntry,
196    db_prefix = DbKeyPrefix::PaymentCodeInvoices,
197);
198impl_db_lookup!(
199    key = PaymentCodeInvoiceKey,
200    query_prefix = PaymentCodeInvoicePrefix
201);
202
203#[derive(Debug, Clone, Eq, PartialEq, Hash, Encodable, Decodable)]
204pub struct SchemaVersionKey;
205
206impl_db_record!(
207    key = SchemaVersionKey,
208    value = u64,
209    db_prefix = DbKeyPrefix::SchemaVersion,
210);
211
212type DbMigration =
213    for<'a> fn(&'a RecurringInvoiceServer, DatabaseTransaction<'a>) -> BoxFuture<'a, ()>;
214
215impl RecurringInvoiceServer {
216    pub(crate) fn migrations() -> BTreeMap<u64, DbMigration> {
217        vec![(
218            1,
219            (|server: &RecurringInvoiceServer, dbtx| Box::pin(server.db_migration_v1(dbtx)))
220                as DbMigration,
221        )]
222        .into_iter()
223        .collect()
224    }
225
226    /// Backfill DB fix for bug that caused "holes" in invoice indices keeping
227    /// the client from syncing. See <https://github.com/fedimint/fedimint/pull/7653>.
228    async fn db_migration_v1(&self, mut dbtx: DatabaseTransaction<'_>) {
229        const BACKFILL_AMOUNT: Amount = Amount::from_msats(111111);
230
231        let mut payment_codes = dbtx
232            .find_by_prefix(&PaymentCodePrefix)
233            .await
234            .map(|(k, v)| (k.payment_code_id, v))
235            .collect::<HashMap<PaymentCodeId, PaymentCodeEntry>>()
236            .await;
237
238        let payment_code_indices = dbtx
239            .find_by_prefix(&PaymentCodeNextInvoiceIndexKeyPrefix)
240            .await
241            .map(|(payment_code_key, invoice_idx)| (payment_code_key.payment_code_id, invoice_idx))
242            .collect::<HashMap<PaymentCodeId, u64>>()
243            .await;
244
245        for (payment_code_id, current_invoice_index) in payment_code_indices {
246            let payment_code_entry = payment_codes
247                .remove(&payment_code_id)
248                .expect("If there's an index, there's a payment code entry");
249
250            let payment_code_invoice_indices = dbtx
251                .find_by_prefix(&PaymentCodeInvoicePrefix { payment_code_id })
252                .await
253                .map(|(invoice_key, _)| invoice_key.index)
254                .collect::<HashSet<_>>()
255                .await;
256
257            let client = self
258                .get_federation_client(payment_code_entry.federation_id)
259                .await
260                .expect("Federation client exists if we have the code in our DB");
261            let ln_client_module = client.get_ln_module().expect("LN module is present");
262
263            let missing_indices = (1..=current_invoice_index)
264                .filter(|idx| !payment_code_invoice_indices.contains(idx));
265            for missing_index in missing_indices {
266                let initial_operation_id =
267                    operation_id_from_user_key(payment_code_entry.root_key, missing_index);
268                let invoice = if let Some(invoice) =
269                    Self::check_if_invoice_exists(&client, initial_operation_id).await
270                {
271                    invoice
272                } else {
273                    // Generate fake invoice to backfill "holes" in invoice indices
274                    let (_, invoice, _) = ln_client_module
275                        .create_bolt11_invoice_for_user(
276                            BACKFILL_AMOUNT,
277                            Bolt11InvoiceDescription::Direct(
278                                Description::new("Backfill".to_string()).unwrap(),
279                            ),
280                            Some(3600),
281                            tweak_user_key(SECP256K1, payment_code_entry.root_key.0, missing_index),
282                            (),
283                            None,
284                        )
285                        .await
286                        .expect("We checked that there is no invoice for that index already");
287                    invoice
288                };
289
290                self.save_bolt11_invoice(
291                    &mut dbtx,
292                    initial_operation_id,
293                    payment_code_id,
294                    missing_index,
295                    invoice,
296                )
297                .await;
298            }
299        }
300    }
301}