fedimint_lnv2_server/
db.rs

1use std::collections::BTreeMap;
2
3use fedimint_core::db::IDatabaseTransactionOpsCoreTyped;
4use fedimint_core::encoding::{Decodable, Encodable};
5use fedimint_core::util::SafeUrl;
6use fedimint_core::{OutPoint, PeerId, impl_db_lookup, impl_db_record};
7use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
8use fedimint_lnv2_common::{ContractId, LightningInputV0, LightningOutputV0};
9use fedimint_server_core::migration::{
10    ModuleHistoryItem, ServerModuleDbMigrationFnContext, ServerModuleDbMigrationFnContextExt,
11};
12use futures::StreamExt;
13use serde::{Deserialize, Serialize};
14use strum_macros::EnumIter;
15use tpe::DecryptionKeyShare;
16
17use crate::Lightning;
18
19#[repr(u8)]
20#[derive(Clone, EnumIter, Debug)]
21pub enum DbKeyPrefix {
22    BlockCountVote = 0x01,
23    UnixTimeVote = 0x02,
24    IncomingContract = 0x03,
25    IncomingContractOutpoint = 0x04,
26    OutgoingContract = 0x05,
27    DecryptionKeyShare = 0x06,
28    Preimage = 0x07,
29    Gateway = 0x08,
30    IncomingContractStreamIndex = 0x09,
31    IncomingContractStream = 0x10,
32    IncomingContractIndex = 0x11,
33}
34
35impl std::fmt::Display for DbKeyPrefix {
36    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
37        write!(f, "{self:?}")
38    }
39}
40
41#[derive(Debug, Encodable, Decodable, Serialize)]
42pub struct BlockCountVoteKey(pub PeerId);
43
44#[derive(Clone, Debug, Encodable, Decodable)]
45pub struct BlockCountVotePrefix;
46
47impl_db_record!(
48    key = BlockCountVoteKey,
49    value = u64,
50    db_prefix = DbKeyPrefix::BlockCountVote,
51);
52
53impl_db_lookup!(key = BlockCountVoteKey, query_prefix = BlockCountVotePrefix);
54
55#[derive(Debug, Encodable, Decodable, Serialize)]
56pub struct UnixTimeVoteKey(pub PeerId);
57
58#[derive(Clone, Debug, Encodable, Decodable)]
59pub struct UnixTimeVotePrefix;
60
61impl_db_record!(
62    key = UnixTimeVoteKey,
63    value = u64,
64    db_prefix = DbKeyPrefix::UnixTimeVote,
65);
66
67impl_db_lookup!(key = UnixTimeVoteKey, query_prefix = UnixTimeVotePrefix);
68
69#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize, Serialize, Encodable, Decodable)]
70pub struct IncomingContractKey(pub OutPoint);
71
72#[derive(Clone, Debug, Encodable, Decodable)]
73pub struct IncomingContractPrefix;
74
75impl_db_record!(
76    key = IncomingContractKey,
77    value = IncomingContract,
78    db_prefix = DbKeyPrefix::IncomingContract,
79    notify_on_modify = true
80);
81
82impl_db_lookup!(
83    key = IncomingContractKey,
84    query_prefix = IncomingContractPrefix
85);
86
87#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize, Serialize, Encodable, Decodable)]
88pub struct IncomingContractOutpointKey(pub ContractId);
89
90#[derive(Clone, Debug, Encodable, Decodable)]
91pub struct IncomingContractOutpointPrefix;
92
93impl_db_record!(
94    key = IncomingContractOutpointKey,
95    value = OutPoint,
96    db_prefix = DbKeyPrefix::IncomingContractOutpoint,
97    notify_on_modify = true
98);
99
100impl_db_lookup!(
101    key = IncomingContractOutpointKey,
102    query_prefix = IncomingContractOutpointPrefix
103);
104
105#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize, Serialize, Encodable, Decodable)]
106pub struct OutgoingContractKey(pub OutPoint);
107
108#[derive(Clone, Debug, Encodable, Decodable)]
109pub struct OutgoingContractPrefix;
110
111impl_db_record!(
112    key = OutgoingContractKey,
113    value = OutgoingContract,
114    db_prefix = DbKeyPrefix::OutgoingContract,
115    notify_on_modify = true
116);
117
118impl_db_lookup!(
119    key = OutgoingContractKey,
120    query_prefix = OutgoingContractPrefix
121);
122
123#[derive(Debug, Encodable, Decodable, Serialize)]
124pub struct DecryptionKeyShareKey(pub OutPoint);
125
126#[derive(Clone, Debug, Encodable, Decodable)]
127pub struct DecryptionKeySharePrefix;
128
129impl_db_record!(
130    key = DecryptionKeyShareKey,
131    value = DecryptionKeyShare,
132    db_prefix = DbKeyPrefix::DecryptionKeyShare,
133    notify_on_modify = true
134);
135
136impl_db_lookup!(
137    key = DecryptionKeyShareKey,
138    query_prefix = DecryptionKeySharePrefix
139);
140
141#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize, Serialize, Encodable, Decodable)]
142pub struct PreimageKey(pub OutPoint);
143
144#[derive(Debug, Encodable, Decodable)]
145pub struct PreimagePrefix;
146
147impl_db_record!(
148    key = PreimageKey,
149    value = [u8; 32],
150    db_prefix = DbKeyPrefix::Preimage,
151    notify_on_modify = true
152);
153
154impl_db_lookup!(key = PreimageKey, query_prefix = PreimagePrefix);
155
156#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize, Serialize, Encodable, Decodable)]
157pub struct GatewayKey(pub SafeUrl);
158
159#[derive(Debug, Encodable, Decodable)]
160pub struct GatewayPrefix;
161
162impl_db_record!(
163    key = GatewayKey,
164    value = (),
165    db_prefix = DbKeyPrefix::Gateway,
166);
167
168impl_db_lookup!(key = GatewayKey, query_prefix = GatewayPrefix);
169
170/// Incoming contracts are indexed in three ways:
171/// 1) A sequential stream mapping: `stream_index (u64)` -> `IncomingContract`
172///    This enables efficient streaming reads using
173///    `IncomingContractStreamPrefix(start)`.
174/// 2) A global monotonically increasing index: `IncomingContractStreamIndexKey`
175///    -> `u64` This stores the next stream index to be assigned and is used to
176///    wait for new icoming contracts to arrive.
177/// 3) A reverse lookup from `OutPoint` -> `stream_index` (via
178///    `IncomingContractIndexKey`) This allows finding a specific incoming
179///    contract's stream position by its `OutPoint`, while still supporting
180///    sequential reads via the stream prefix. This is used to remove the
181///    contract from the stream once it has been spent.
182///
183/// The combination allows both random access (by `OutPoint`) and ordered
184/// iteration over all unspent incoming contracts (by `stream_index`).
185
186#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize, Serialize, Encodable, Decodable)]
187pub struct IncomingContractStreamIndexKey;
188
189impl_db_record!(
190    key = IncomingContractStreamIndexKey,
191    value = u64,
192    db_prefix = DbKeyPrefix::IncomingContractStreamIndex,
193    notify_on_modify = true
194);
195
196#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize, Serialize, Encodable, Decodable)]
197pub struct IncomingContractStreamKey(pub u64);
198
199#[derive(Debug, Encodable, Decodable)]
200pub struct IncomingContractStreamPrefix(pub u64);
201
202impl_db_record!(
203    key = IncomingContractStreamKey,
204    value = IncomingContract,
205    db_prefix = DbKeyPrefix::IncomingContractStream,
206);
207
208impl_db_lookup!(
209    key = IncomingContractStreamKey,
210    query_prefix = IncomingContractStreamPrefix
211);
212
213#[derive(Debug, Clone, Eq, PartialEq, Hash, Deserialize, Serialize, Encodable, Decodable)]
214pub struct IncomingContractIndexKey(pub OutPoint);
215
216#[derive(Debug, Encodable, Decodable)]
217pub struct IncomingContractIndexPrefix;
218
219impl_db_record!(
220    key = IncomingContractIndexKey,
221    value = u64,
222    db_prefix = DbKeyPrefix::IncomingContractIndex,
223);
224
225impl_db_lookup!(
226    key = IncomingContractIndexKey,
227    query_prefix = IncomingContractIndexPrefix
228);
229
230pub async fn migrate_to_v1(
231    mut ctx: ServerModuleDbMigrationFnContext<'_, Lightning>,
232) -> Result<(), anyhow::Error> {
233    let mut contracts = BTreeMap::new();
234    let mut stream_index = 0;
235
236    let mut stream = ctx.get_typed_module_history_stream().await;
237
238    while let Some(item) = stream.next().await {
239        match item {
240            ModuleHistoryItem::Output(output, outpoint) => {
241                if let Some(LightningOutputV0::Incoming(contract)) = output.maybe_v0_ref() {
242                    contracts.insert(outpoint, (stream_index, contract.clone()));
243                    stream_index += 1;
244                }
245            }
246            ModuleHistoryItem::Input(input) => {
247                if let Some(LightningInputV0::Incoming(outpoint, _)) = input.maybe_v0_ref() {
248                    contracts.remove(outpoint);
249                }
250            }
251            ModuleHistoryItem::ConsensusItem(_) => {}
252        }
253    }
254
255    drop(stream);
256
257    for (outpoint, (index, contract)) in contracts {
258        ctx.dbtx()
259            .insert_new_entry(&IncomingContractStreamKey(index), &contract)
260            .await;
261
262        ctx.dbtx()
263            .insert_new_entry(&IncomingContractIndexKey(outpoint), &index)
264            .await;
265    }
266
267    ctx.dbtx()
268        .insert_new_entry(&IncomingContractStreamIndexKey, &stream_index)
269        .await;
270
271    Ok(())
272}