1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5pub use fedimint_lnv2_common as common;
6
7mod db;
8
9use std::collections::{BTreeMap, BTreeSet};
10use std::time::Duration;
11
12use anyhow::{Context, anyhow, ensure};
13use bls12_381::{G1Projective, Scalar};
14use fedimint_core::bitcoin::hashes::sha256;
15use fedimint_core::config::{
16 ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
17 TypedServerModuleConsensusConfig,
18};
19use fedimint_core::core::ModuleInstanceId;
20use fedimint_core::db::{
21 Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
22};
23use fedimint_core::encoding::Encodable;
24use fedimint_core::envs::{FM_ENABLE_MODULE_LNV2_ENV, is_env_var_set_opt};
25use fedimint_core::module::audit::Audit;
26use fedimint_core::module::{
27 Amounts, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
28 InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
29 TransactionItemAmounts, api_endpoint,
30};
31use fedimint_core::net::auth::check_auth;
32use fedimint_core::task::timeout;
33use fedimint_core::time::duration_since_epoch;
34use fedimint_core::util::SafeUrl;
35use fedimint_core::{
36 BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
37 push_db_pair_items,
38};
39use fedimint_lnv2_common::config::{
40 FeeConsensus, LightningClientConfig, LightningConfig, LightningConfigConsensus,
41 LightningConfigPrivate,
42};
43use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
44use fedimint_lnv2_common::endpoint_constants::{
45 ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_INCOMING_CONTRACTS_ENDPOINT,
46 AWAIT_PREIMAGE_ENDPOINT, CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT,
47 GATEWAYS_ENDPOINT, OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
48};
49use fedimint_lnv2_common::{
50 ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
51 LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
52 LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
53};
54use fedimint_logging::LOG_MODULE_LNV2;
55use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
56use fedimint_server_core::config::{PeerHandleOps, eval_poly_g1};
57use fedimint_server_core::migration::ServerModuleDbMigrationFn;
58use fedimint_server_core::{
59 ConfigGenModuleArgs, EnvVarDoc, ServerModule, ServerModuleInit, ServerModuleInitArgs,
60};
61use futures::StreamExt;
62use group::Curve;
63use group::ff::Field;
64use rand::SeedableRng;
65use rand_chacha::ChaChaRng;
66use strum::IntoEnumIterator;
67use tpe::{
68 AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
69};
70use tracing::trace;
71
72use crate::db::{
73 BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
74 DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractIndexKey,
75 IncomingContractIndexPrefix, IncomingContractKey, IncomingContractOutpointKey,
76 IncomingContractOutpointPrefix, IncomingContractPrefix, IncomingContractStreamIndexKey,
77 IncomingContractStreamKey, IncomingContractStreamPrefix, OutgoingContractKey,
78 OutgoingContractPrefix, PreimageKey, PreimagePrefix, UnixTimeVoteKey, UnixTimeVotePrefix,
79};
80
81#[derive(Debug, Clone)]
82pub struct LightningInit;
83
84impl ModuleInit for LightningInit {
85 type Common = LightningCommonInit;
86
87 #[allow(clippy::too_many_lines)]
88 async fn dump_database(
89 &self,
90 dbtx: &mut DatabaseTransaction<'_>,
91 prefix_names: Vec<String>,
92 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
93 let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
94 BTreeMap::new();
95
96 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
97 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
98 });
99
100 for table in filtered_prefixes {
101 match table {
102 DbKeyPrefix::BlockCountVote => {
103 push_db_pair_items!(
104 dbtx,
105 BlockCountVotePrefix,
106 BlockCountVoteKey,
107 u64,
108 lightning,
109 "Lightning Block Count Votes"
110 );
111 }
112 DbKeyPrefix::UnixTimeVote => {
113 push_db_pair_items!(
114 dbtx,
115 UnixTimeVotePrefix,
116 UnixTimeVoteKey,
117 u64,
118 lightning,
119 "Lightning Unix Time Votes"
120 );
121 }
122 DbKeyPrefix::OutgoingContract => {
123 push_db_pair_items!(
124 dbtx,
125 OutgoingContractPrefix,
126 LightningOutgoingContractKey,
127 OutgoingContract,
128 lightning,
129 "Lightning Outgoing Contracts"
130 );
131 }
132 DbKeyPrefix::IncomingContract => {
133 push_db_pair_items!(
134 dbtx,
135 IncomingContractPrefix,
136 LightningIncomingContractKey,
137 IncomingContract,
138 lightning,
139 "Lightning Incoming Contracts"
140 );
141 }
142 DbKeyPrefix::IncomingContractOutpoint => {
143 push_db_pair_items!(
144 dbtx,
145 IncomingContractOutpointPrefix,
146 LightningIncomingContractOutpointKey,
147 OutPoint,
148 lightning,
149 "Lightning Incoming Contracts Outpoints"
150 );
151 }
152 DbKeyPrefix::DecryptionKeyShare => {
153 push_db_pair_items!(
154 dbtx,
155 DecryptionKeySharePrefix,
156 DecryptionKeyShareKey,
157 DecryptionKeyShare,
158 lightning,
159 "Lightning Decryption Key Share"
160 );
161 }
162 DbKeyPrefix::Preimage => {
163 push_db_pair_items!(
164 dbtx,
165 PreimagePrefix,
166 LightningPreimageKey,
167 [u8; 32],
168 lightning,
169 "Lightning Preimages"
170 );
171 }
172 DbKeyPrefix::Gateway => {
173 push_db_pair_items!(
174 dbtx,
175 GatewayPrefix,
176 GatewayKey,
177 (),
178 lightning,
179 "Lightning Gateways"
180 );
181 }
182 DbKeyPrefix::IncomingContractStreamIndex => {
183 push_db_pair_items!(
184 dbtx,
185 IncomingContractStreamIndexKey,
186 IncomingContractStreamIndexKey,
187 u64,
188 lightning,
189 "Lightning Incoming Contract Stream Index"
190 );
191 }
192 DbKeyPrefix::IncomingContractStream => {
193 push_db_pair_items!(
194 dbtx,
195 IncomingContractStreamPrefix(0),
196 IncomingContractStreamKey,
197 IncomingContract,
198 lightning,
199 "Lightning Incoming Contract Stream"
200 );
201 }
202 DbKeyPrefix::IncomingContractIndex => {
203 push_db_pair_items!(
204 dbtx,
205 IncomingContractIndexPrefix,
206 IncomingContractIndexKey,
207 u64,
208 lightning,
209 "Lightning Incoming Contract Index"
210 );
211 }
212 }
213 }
214
215 Box::new(lightning.into_iter())
216 }
217}
218
219#[apply(async_trait_maybe_send!)]
220impl ServerModuleInit for LightningInit {
221 type Module = Lightning;
222
223 fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
224 &[MODULE_CONSENSUS_VERSION]
225 }
226
227 fn supported_api_versions(&self) -> SupportedModuleApiVersions {
228 SupportedModuleApiVersions::from_raw(
229 (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
230 (
231 MODULE_CONSENSUS_VERSION.major,
232 MODULE_CONSENSUS_VERSION.minor,
233 ),
234 &[(0, 0)],
235 )
236 }
237
238 fn is_enabled_by_default(&self) -> bool {
239 is_env_var_set_opt(FM_ENABLE_MODULE_LNV2_ENV).unwrap_or(true)
240 }
241
242 fn get_documented_env_vars(&self) -> Vec<EnvVarDoc> {
243 vec![EnvVarDoc {
244 name: FM_ENABLE_MODULE_LNV2_ENV,
245 description: "Set to 0/false to disable the LNv2 Lightning module. Enabled by default.",
246 }]
247 }
248
249 async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
250 Ok(Lightning {
251 cfg: args.cfg().to_typed()?,
252 db: args.db().clone(),
253 server_bitcoin_rpc_monitor: args.server_bitcoin_rpc_monitor(),
254 })
255 }
256
257 fn trusted_dealer_gen(
258 &self,
259 peers: &[PeerId],
260 args: &ConfigGenModuleArgs,
261 ) -> BTreeMap<PeerId, ServerModuleConfig> {
262 let tpe_pks = peers
263 .iter()
264 .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
265 .collect::<BTreeMap<PeerId, PublicKeyShare>>();
266
267 peers
268 .iter()
269 .map(|peer| {
270 let cfg = LightningConfig {
271 consensus: LightningConfigConsensus {
272 tpe_agg_pk: dealer_agg_pk(),
273 tpe_pks: tpe_pks.clone(),
274 fee_consensus: if args.disable_base_fees {
275 FeeConsensus::zero()
276 } else {
277 FeeConsensus::new(0).expect("Relative fee is within range")
278 },
279 network: args.network,
280 },
281 private: LightningConfigPrivate {
282 sk: dealer_sk(peers.to_num_peers(), *peer),
283 },
284 };
285
286 (*peer, cfg.to_erased())
287 })
288 .collect()
289 }
290
291 async fn distributed_gen(
292 &self,
293 peers: &(dyn PeerHandleOps + Send + Sync),
294 args: &ConfigGenModuleArgs,
295 ) -> anyhow::Result<ServerModuleConfig> {
296 let (polynomial, sks) = peers.run_dkg_g1().await?;
297
298 let server = LightningConfig {
299 consensus: LightningConfigConsensus {
300 tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
301 tpe_pks: peers
302 .num_peers()
303 .peer_ids()
304 .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
305 .collect(),
306 fee_consensus: if args.disable_base_fees {
307 FeeConsensus::zero()
308 } else {
309 FeeConsensus::new(0).expect("Relative fee is within range")
310 },
311 network: args.network,
312 },
313 private: LightningConfigPrivate {
314 sk: SecretKeyShare(sks),
315 },
316 };
317
318 Ok(server.to_erased())
319 }
320
321 fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
322 let config = config.to_typed::<LightningConfig>()?;
323
324 ensure!(
325 tpe::derive_pk_share(&config.private.sk)
326 == *config
327 .consensus
328 .tpe_pks
329 .get(identity)
330 .context("Public key set has no key for our identity")?,
331 "Preimge encryption secret key share does not match our public key share"
332 );
333
334 Ok(())
335 }
336
337 fn get_client_config(
338 &self,
339 config: &ServerModuleConsensusConfig,
340 ) -> anyhow::Result<LightningClientConfig> {
341 let config = LightningConfigConsensus::from_erased(config)?;
342 Ok(LightningClientConfig {
343 tpe_agg_pk: config.tpe_agg_pk,
344 tpe_pks: config.tpe_pks,
345 fee_consensus: config.fee_consensus,
346 network: config.network,
347 })
348 }
349
350 fn get_database_migrations(
351 &self,
352 ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Lightning>> {
353 let mut migrations: BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Lightning>> =
354 BTreeMap::new();
355
356 migrations.insert(
357 DatabaseVersion(0),
358 Box::new(move |ctx| Box::pin(crate::db::migrate_to_v1(ctx))),
359 );
360
361 migrations
362 }
363
364 fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
365 Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
366 }
367}
368
369fn dealer_agg_pk() -> AggregatePublicKey {
370 AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
371}
372
373fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
374 derive_pk_share(&dealer_sk(num_peers, peer))
375}
376
377fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
378 let x = Scalar::from(peer.to_usize() as u64 + 1);
379
380 let y = (0..num_peers.threshold())
384 .map(|index| coefficient(index as u64))
385 .rev()
386 .reduce(|accumulator, c| accumulator * x + c)
387 .expect("We have at least one coefficient");
388
389 SecretKeyShare(y)
390}
391
392fn coefficient(index: u64) -> Scalar {
393 Scalar::random(&mut ChaChaRng::from_seed(
394 *index.consensus_hash::<sha256::Hash>().as_byte_array(),
395 ))
396}
397
398#[derive(Debug)]
399pub struct Lightning {
400 cfg: LightningConfig,
401 db: Database,
402 server_bitcoin_rpc_monitor: ServerBitcoinRpcMonitor,
403}
404
405#[apply(async_trait_maybe_send!)]
406impl ServerModule for Lightning {
407 type Common = LightningModuleTypes;
408 type Init = LightningInit;
409
410 async fn consensus_proposal(
411 &self,
412 _dbtx: &mut DatabaseTransaction<'_>,
413 ) -> Vec<LightningConsensusItem> {
414 let mut items = vec![LightningConsensusItem::UnixTimeVote(
417 60 * (duration_since_epoch().as_secs() / 60),
418 )];
419
420 if let Ok(block_count) = self.get_block_count() {
421 trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
422 items.push(LightningConsensusItem::BlockCountVote(block_count));
423 }
424
425 items
426 }
427
428 async fn process_consensus_item<'a, 'b>(
429 &'a self,
430 dbtx: &mut DatabaseTransaction<'b>,
431 consensus_item: LightningConsensusItem,
432 peer: PeerId,
433 ) -> anyhow::Result<()> {
434 trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
435
436 match consensus_item {
437 LightningConsensusItem::BlockCountVote(vote) => {
438 let current_vote = dbtx
439 .insert_entry(&BlockCountVoteKey(peer), &vote)
440 .await
441 .unwrap_or(0);
442
443 ensure!(current_vote < vote, "Block count vote is redundant");
444
445 Ok(())
446 }
447 LightningConsensusItem::UnixTimeVote(vote) => {
448 let current_vote = dbtx
449 .insert_entry(&UnixTimeVoteKey(peer), &vote)
450 .await
451 .unwrap_or(0);
452
453 ensure!(current_vote < vote, "Unix time vote is redundant");
454
455 Ok(())
456 }
457 LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
458 "Received lnv2 consensus item with unknown variant {variant}"
459 )),
460 }
461 }
462
463 async fn process_input<'a, 'b, 'c>(
464 &'a self,
465 dbtx: &mut DatabaseTransaction<'c>,
466 input: &'b LightningInput,
467 _in_point: InPoint,
468 ) -> Result<InputMeta, LightningInputError> {
469 let (pub_key, amount) = match input.ensure_v0_ref()? {
470 LightningInputV0::Outgoing(outpoint, outgoing_witness) => {
471 let contract = dbtx
472 .remove_entry(&OutgoingContractKey(*outpoint))
473 .await
474 .ok_or(LightningInputError::UnknownContract)?;
475
476 let pub_key = match outgoing_witness {
477 OutgoingWitness::Claim(preimage) => {
478 if contract.expiration <= self.consensus_block_count(dbtx).await {
479 return Err(LightningInputError::Expired);
480 }
481
482 if !contract.verify_preimage(preimage) {
483 return Err(LightningInputError::InvalidPreimage);
484 }
485
486 dbtx.insert_entry(&PreimageKey(*outpoint), preimage).await;
487
488 contract.claim_pk
489 }
490 OutgoingWitness::Refund => {
491 if contract.expiration > self.consensus_block_count(dbtx).await {
492 return Err(LightningInputError::NotExpired);
493 }
494
495 contract.refund_pk
496 }
497 OutgoingWitness::Cancel(forfeit_signature) => {
498 if !contract.verify_forfeit_signature(forfeit_signature) {
499 return Err(LightningInputError::InvalidForfeitSignature);
500 }
501
502 contract.refund_pk
503 }
504 };
505
506 (pub_key, contract.amount)
507 }
508 LightningInputV0::Incoming(outpoint, agg_decryption_key) => {
509 let contract = dbtx
510 .remove_entry(&IncomingContractKey(*outpoint))
511 .await
512 .ok_or(LightningInputError::UnknownContract)?;
513
514 let index = dbtx
515 .remove_entry(&IncomingContractIndexKey(*outpoint))
516 .await
517 .expect("Incoming contract index should exist");
518
519 dbtx.remove_entry(&IncomingContractStreamKey(index)).await;
520
521 if !contract
522 .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
523 {
524 return Err(LightningInputError::InvalidDecryptionKey);
525 }
526
527 let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
528 Some(..) => contract.commitment.claim_pk,
529 None => contract.commitment.refund_pk,
530 };
531
532 (pub_key, contract.commitment.amount)
533 }
534 };
535
536 Ok(InputMeta {
537 amount: TransactionItemAmounts {
538 amounts: Amounts::new_bitcoin(amount),
539 fees: Amounts::new_bitcoin(self.cfg.consensus.fee_consensus.fee(amount)),
540 },
541 pub_key,
542 })
543 }
544
545 async fn process_output<'a, 'b>(
546 &'a self,
547 dbtx: &mut DatabaseTransaction<'b>,
548 output: &'a LightningOutput,
549 outpoint: OutPoint,
550 ) -> Result<TransactionItemAmounts, LightningOutputError> {
551 let amount = match output.ensure_v0_ref()? {
552 LightningOutputV0::Outgoing(contract) => {
553 dbtx.insert_new_entry(&OutgoingContractKey(outpoint), contract)
554 .await;
555
556 contract.amount
557 }
558 LightningOutputV0::Incoming(contract) => {
559 if !contract.verify() {
560 return Err(LightningOutputError::InvalidContract);
561 }
562
563 if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
564 return Err(LightningOutputError::ContractExpired);
565 }
566
567 dbtx.insert_new_entry(&IncomingContractKey(outpoint), contract)
568 .await;
569
570 dbtx.insert_entry(
571 &IncomingContractOutpointKey(contract.contract_id()),
572 &outpoint,
573 )
574 .await;
575
576 let stream_index = dbtx
577 .get_value(&IncomingContractStreamIndexKey)
578 .await
579 .unwrap_or(0);
580
581 dbtx.insert_entry(&IncomingContractStreamKey(stream_index), contract)
582 .await;
583
584 dbtx.insert_entry(&IncomingContractIndexKey(outpoint), &stream_index)
585 .await;
586
587 dbtx.insert_entry(&IncomingContractStreamIndexKey, &(stream_index + 1))
588 .await;
589
590 let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
591
592 dbtx.insert_entry(&DecryptionKeyShareKey(outpoint), &dk_share)
593 .await;
594
595 contract.commitment.amount
596 }
597 };
598
599 Ok(TransactionItemAmounts {
600 amounts: Amounts::new_bitcoin(amount),
601 fees: Amounts::new_bitcoin(self.cfg.consensus.fee_consensus.fee(amount)),
602 })
603 }
604
605 async fn output_status(
606 &self,
607 _dbtx: &mut DatabaseTransaction<'_>,
608 _out_point: OutPoint,
609 ) -> Option<LightningOutputOutcome> {
610 None
611 }
612
613 async fn audit(
614 &self,
615 dbtx: &mut DatabaseTransaction<'_>,
616 audit: &mut Audit,
617 module_instance_id: ModuleInstanceId,
618 ) {
619 audit
622 .add_items(
623 dbtx,
624 module_instance_id,
625 &OutgoingContractPrefix,
626 |_, contract| -(contract.amount.msats as i64),
627 )
628 .await;
629
630 audit
631 .add_items(
632 dbtx,
633 module_instance_id,
634 &IncomingContractPrefix,
635 |_, contract| -(contract.commitment.amount.msats as i64),
636 )
637 .await;
638 }
639
640 fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
641 vec![
642 api_endpoint! {
643 CONSENSUS_BLOCK_COUNT_ENDPOINT,
644 ApiVersion::new(0, 0),
645 async |module: &Lightning, context, _params : () | -> u64 {
646 let db = context.db();
647 let mut dbtx = db.begin_transaction_nc().await;
648
649 Ok(module.consensus_block_count(&mut dbtx).await)
650 }
651 },
652 api_endpoint! {
653 AWAIT_INCOMING_CONTRACT_ENDPOINT,
654 ApiVersion::new(0, 0),
655 async |module: &Lightning, context, params: (ContractId, u64) | -> Option<OutPoint> {
656 let db = context.db();
657
658 Ok(module.await_incoming_contract(db, params.0, params.1).await)
659 }
660 },
661 api_endpoint! {
662 AWAIT_PREIMAGE_ENDPOINT,
663 ApiVersion::new(0, 0),
664 async |module: &Lightning, context, params: (OutPoint, u64)| -> Option<[u8; 32]> {
665 let db = context.db();
666
667 Ok(module.await_preimage(db, params.0, params.1).await)
668 }
669 },
670 api_endpoint! {
671 DECRYPTION_KEY_SHARE_ENDPOINT,
672 ApiVersion::new(0, 0),
673 async |_module: &Lightning, context, params: OutPoint| -> DecryptionKeyShare {
674 let share = context
675 .db()
676 .begin_transaction_nc()
677 .await
678 .get_value(&DecryptionKeyShareKey(params))
679 .await
680 .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
681
682 Ok(share)
683 }
684 },
685 api_endpoint! {
686 OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
687 ApiVersion::new(0, 0),
688 async |module: &Lightning, context, outpoint: OutPoint| -> Option<(ContractId, u64)> {
689 let db = context.db();
690
691 Ok(module.outgoing_contract_expiration(db, outpoint).await)
692 }
693 },
694 api_endpoint! {
695 AWAIT_INCOMING_CONTRACTS_ENDPOINT,
696 ApiVersion::new(0, 0),
697 async |module: &Lightning, context, params: (u64, usize)| -> (Vec<IncomingContract>, u64) {
698 let db = context.db();
699
700 if params.1 == 0 {
701 return Err(ApiError::bad_request("Batch size must be greater than 0".to_string()));
702 }
703
704 Ok(module.await_incoming_contracts(db, params.0, params.1).await)
705 }
706 },
707 api_endpoint! {
708 ADD_GATEWAY_ENDPOINT,
709 ApiVersion::new(0, 0),
710 async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
711 check_auth(context)?;
712
713 let db = context.db();
714
715 Ok(Lightning::add_gateway(db, gateway).await)
716 }
717 },
718 api_endpoint! {
719 REMOVE_GATEWAY_ENDPOINT,
720 ApiVersion::new(0, 0),
721 async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
722 check_auth(context)?;
723
724 let db = context.db();
725
726 Ok(Lightning::remove_gateway(db, gateway).await)
727 }
728 },
729 api_endpoint! {
730 GATEWAYS_ENDPOINT,
731 ApiVersion::new(0, 0),
732 async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
733 let db = context.db();
734
735 Ok(Lightning::gateways(db).await)
736 }
737 },
738 ]
739 }
740}
741
742impl Lightning {
743 fn get_block_count(&self) -> anyhow::Result<u64> {
744 self.server_bitcoin_rpc_monitor
745 .status()
746 .map(|status| status.block_count)
747 .context("Block count not available yet")
748 }
749
750 async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
751 let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
752
753 let mut counts = dbtx
754 .find_by_prefix(&BlockCountVotePrefix)
755 .await
756 .map(|entry| entry.1)
757 .collect::<Vec<u64>>()
758 .await;
759
760 counts.sort_unstable();
761
762 counts.reverse();
763
764 assert!(counts.last() <= counts.first());
765
766 counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
771 }
772
773 async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
774 let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
775
776 let mut times = dbtx
777 .find_by_prefix(&UnixTimeVotePrefix)
778 .await
779 .map(|entry| entry.1)
780 .collect::<Vec<u64>>()
781 .await;
782
783 times.sort_unstable();
784
785 times.reverse();
786
787 assert!(times.last() <= times.first());
788
789 times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
794 }
795
796 async fn await_incoming_contract(
797 &self,
798 db: Database,
799 contract_id: ContractId,
800 expiration: u64,
801 ) -> Option<OutPoint> {
802 loop {
803 timeout(
804 Duration::from_secs(10),
805 db.wait_key_exists(&IncomingContractOutpointKey(contract_id)),
806 )
807 .await
808 .ok();
809
810 let mut dbtx = db.begin_transaction_nc().await;
813
814 if let Some(outpoint) = dbtx
815 .get_value(&IncomingContractOutpointKey(contract_id))
816 .await
817 {
818 return Some(outpoint);
819 }
820
821 if expiration <= self.consensus_unix_time(&mut dbtx).await {
822 return None;
823 }
824 }
825 }
826
827 async fn await_preimage(
828 &self,
829 db: Database,
830 outpoint: OutPoint,
831 expiration: u64,
832 ) -> Option<[u8; 32]> {
833 loop {
834 timeout(
835 Duration::from_secs(10),
836 db.wait_key_exists(&PreimageKey(outpoint)),
837 )
838 .await
839 .ok();
840
841 let mut dbtx = db.begin_transaction_nc().await;
844
845 if let Some(preimage) = dbtx.get_value(&PreimageKey(outpoint)).await {
846 return Some(preimage);
847 }
848
849 if expiration <= self.consensus_block_count(&mut dbtx).await {
850 return None;
851 }
852 }
853 }
854
855 async fn outgoing_contract_expiration(
856 &self,
857 db: Database,
858 outpoint: OutPoint,
859 ) -> Option<(ContractId, u64)> {
860 let mut dbtx = db.begin_transaction_nc().await;
861
862 let contract = dbtx.get_value(&OutgoingContractKey(outpoint)).await?;
863
864 let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
865
866 let expiration = contract.expiration.saturating_sub(consensus_block_count);
867
868 Some((contract.contract_id(), expiration))
869 }
870
871 async fn await_incoming_contracts(
872 &self,
873 db: Database,
874 start: u64,
875 n: usize,
876 ) -> (Vec<IncomingContract>, u64) {
877 let filter = |next_index: Option<u64>| next_index.filter(|i| *i > start);
878
879 let (mut next_index, mut dbtx) = db
880 .wait_key_check(&IncomingContractStreamIndexKey, filter)
881 .await;
882
883 let mut contracts = Vec::with_capacity(n);
884
885 let range = IncomingContractStreamKey(start)..IncomingContractStreamKey(u64::MAX);
886
887 for (key, contract) in dbtx
888 .find_by_range(range)
889 .await
890 .take(n)
891 .collect::<Vec<(IncomingContractStreamKey, IncomingContract)>>()
892 .await
893 {
894 contracts.push(contract.clone());
895 next_index = key.0 + 1;
896 }
897
898 (contracts, next_index)
899 }
900
901 async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
902 let mut dbtx = db.begin_transaction().await;
903
904 let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
905
906 dbtx.commit_tx().await;
907
908 is_new_entry
909 }
910
911 async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
912 let mut dbtx = db.begin_transaction().await;
913
914 let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
915
916 dbtx.commit_tx().await;
917
918 entry_existed
919 }
920
921 async fn gateways(db: Database) -> Vec<SafeUrl> {
922 db.begin_transaction_nc()
923 .await
924 .find_by_prefix(&GatewayPrefix)
925 .await
926 .map(|entry| entry.0.0)
927 .collect()
928 .await
929 }
930
931 pub async fn consensus_block_count_ui(&self) -> u64 {
932 self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
933 .await
934 }
935
936 pub async fn consensus_unix_time_ui(&self) -> u64 {
937 self.consensus_unix_time(&mut self.db.begin_transaction_nc().await)
938 .await
939 }
940
941 pub async fn add_gateway_ui(&self, gateway: SafeUrl) -> bool {
942 Self::add_gateway(self.db.clone(), gateway).await
943 }
944
945 pub async fn remove_gateway_ui(&self, gateway: SafeUrl) -> bool {
946 Self::remove_gateway(self.db.clone(), gateway).await
947 }
948
949 pub async fn gateways_ui(&self) -> Vec<SafeUrl> {
950 Self::gateways(self.db.clone()).await
951 }
952}