1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5mod db;
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::time::Duration;
9
10use anyhow::{Context, anyhow, ensure};
11use bls12_381::{G1Projective, Scalar};
12use fedimint_core::bitcoin::hashes::sha256;
13use fedimint_core::config::{
14 ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
15 TypedServerModuleConfig, TypedServerModuleConsensusConfig,
16};
17use fedimint_core::core::ModuleInstanceId;
18use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
19use fedimint_core::encoding::Encodable;
20use fedimint_core::module::audit::Audit;
21use fedimint_core::module::{
22 ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion, InputMeta,
23 ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions, TransactionItemAmount,
24 api_endpoint,
25};
26use fedimint_core::task::timeout;
27use fedimint_core::time::duration_since_epoch;
28use fedimint_core::util::SafeUrl;
29use fedimint_core::{
30 BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
31 push_db_pair_items,
32};
33use fedimint_lnv2_common::config::{
34 FeeConsensus, LightningClientConfig, LightningConfig, LightningConfigConsensus,
35 LightningConfigPrivate, LightningGenParams,
36};
37use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
38use fedimint_lnv2_common::endpoint_constants::{
39 ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_INCOMING_CONTRACTS_ENDPOINT,
40 AWAIT_PREIMAGE_ENDPOINT, CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT,
41 GATEWAYS_ENDPOINT, OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
42};
43use fedimint_lnv2_common::{
44 ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
45 LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
46 LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
47};
48use fedimint_logging::LOG_MODULE_LNV2;
49use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
50use fedimint_server_core::config::{PeerHandleOps, eval_poly_g1};
51use fedimint_server_core::net::check_auth;
52use fedimint_server_core::{ServerModule, ServerModuleInit, ServerModuleInitArgs};
53use futures::StreamExt;
54use group::Curve;
55use group::ff::Field;
56use rand::SeedableRng;
57use rand_chacha::ChaChaRng;
58use strum::IntoEnumIterator;
59use tpe::{
60 AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
61};
62use tracing::trace;
63
64use crate::db::{
65 BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
66 DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractIndexKey,
67 IncomingContractIndexPrefix, IncomingContractKey, IncomingContractOutpointKey,
68 IncomingContractOutpointPrefix, IncomingContractPrefix, IncomingContractStreamIndexKey,
69 IncomingContractStreamKey, IncomingContractStreamPrefix, OutgoingContractKey,
70 OutgoingContractPrefix, PreimageKey, PreimagePrefix, UnixTimeVoteKey, UnixTimeVotePrefix,
71};
72
73#[derive(Debug, Clone)]
74pub struct LightningInit;
75
76impl ModuleInit for LightningInit {
77 type Common = LightningCommonInit;
78
79 #[allow(clippy::too_many_lines)]
80 async fn dump_database(
81 &self,
82 dbtx: &mut DatabaseTransaction<'_>,
83 prefix_names: Vec<String>,
84 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
85 let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
86 BTreeMap::new();
87
88 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
89 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
90 });
91
92 for table in filtered_prefixes {
93 match table {
94 DbKeyPrefix::BlockCountVote => {
95 push_db_pair_items!(
96 dbtx,
97 BlockCountVotePrefix,
98 BlockCountVoteKey,
99 u64,
100 lightning,
101 "Lightning Block Count Votes"
102 );
103 }
104 DbKeyPrefix::UnixTimeVote => {
105 push_db_pair_items!(
106 dbtx,
107 UnixTimeVotePrefix,
108 UnixTimeVoteKey,
109 u64,
110 lightning,
111 "Lightning Unix Time Votes"
112 );
113 }
114 DbKeyPrefix::OutgoingContract => {
115 push_db_pair_items!(
116 dbtx,
117 OutgoingContractPrefix,
118 LightningOutgoingContractKey,
119 OutgoingContract,
120 lightning,
121 "Lightning Outgoing Contracts"
122 );
123 }
124 DbKeyPrefix::IncomingContract => {
125 push_db_pair_items!(
126 dbtx,
127 IncomingContractPrefix,
128 LightningIncomingContractKey,
129 IncomingContract,
130 lightning,
131 "Lightning Incoming Contracts"
132 );
133 }
134 DbKeyPrefix::IncomingContractOutpoint => {
135 push_db_pair_items!(
136 dbtx,
137 IncomingContractOutpointPrefix,
138 LightningIncomingContractOutpointKey,
139 OutPoint,
140 lightning,
141 "Lightning Incoming Contracts Outpoints"
142 );
143 }
144 DbKeyPrefix::DecryptionKeyShare => {
145 push_db_pair_items!(
146 dbtx,
147 DecryptionKeySharePrefix,
148 DecryptionKeyShareKey,
149 DecryptionKeyShare,
150 lightning,
151 "Lightning Decryption Key Share"
152 );
153 }
154 DbKeyPrefix::Preimage => {
155 push_db_pair_items!(
156 dbtx,
157 PreimagePrefix,
158 LightningPreimageKey,
159 [u8; 32],
160 lightning,
161 "Lightning Preimages"
162 );
163 }
164 DbKeyPrefix::Gateway => {
165 push_db_pair_items!(
166 dbtx,
167 GatewayPrefix,
168 GatewayKey,
169 (),
170 lightning,
171 "Lightning Gateways"
172 );
173 }
174 DbKeyPrefix::IncomingContractStreamIndex => {
175 push_db_pair_items!(
176 dbtx,
177 IncomingContractStreamIndexKey,
178 IncomingContractStreamIndexKey,
179 u64,
180 lightning,
181 "Lightning Incoming Contract Stream Index"
182 );
183 }
184 DbKeyPrefix::IncomingContractStream => {
185 push_db_pair_items!(
186 dbtx,
187 IncomingContractStreamPrefix(0),
188 IncomingContractStreamKey,
189 IncomingContract,
190 lightning,
191 "Lightning Incoming Contract Stream"
192 );
193 }
194 DbKeyPrefix::IncomingContractIndex => {
195 push_db_pair_items!(
196 dbtx,
197 IncomingContractIndexPrefix,
198 IncomingContractIndexKey,
199 u64,
200 lightning,
201 "Lightning Incoming Contract Index"
202 );
203 }
204 }
205 }
206
207 Box::new(lightning.into_iter())
208 }
209}
210
211#[apply(async_trait_maybe_send!)]
212impl ServerModuleInit for LightningInit {
213 type Module = Lightning;
214 type Params = LightningGenParams;
215
216 fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
217 &[MODULE_CONSENSUS_VERSION]
218 }
219
220 fn supported_api_versions(&self) -> SupportedModuleApiVersions {
221 SupportedModuleApiVersions::from_raw(
222 (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
223 (
224 MODULE_CONSENSUS_VERSION.major,
225 MODULE_CONSENSUS_VERSION.minor,
226 ),
227 &[(0, 0)],
228 )
229 }
230
231 async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
232 Ok(Lightning {
233 cfg: args.cfg().to_typed()?,
234 db: args.db().clone(),
235 server_bitcoin_rpc_monitor: args.server_bitcoin_rpc_monitor(),
236 })
237 }
238
239 fn trusted_dealer_gen(
240 &self,
241 peers: &[PeerId],
242 params: &ConfigGenModuleParams,
243 disable_base_fees: bool,
244 ) -> BTreeMap<PeerId, ServerModuleConfig> {
245 let params = self
246 .parse_params(params)
247 .expect("Failed tp parse lnv2 config gen params");
248
249 let tpe_pks = peers
250 .iter()
251 .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
252 .collect::<BTreeMap<PeerId, PublicKeyShare>>();
253
254 peers
255 .iter()
256 .map(|peer| {
257 let cfg = LightningConfig {
258 consensus: LightningConfigConsensus {
259 tpe_agg_pk: dealer_agg_pk(),
260 tpe_pks: tpe_pks.clone(),
261 fee_consensus: params.consensus.fee_consensus.clone().unwrap_or(
262 if disable_base_fees {
263 FeeConsensus::zero()
264 } else {
265 FeeConsensus::new(0).expect("Relative fee is within range")
266 },
267 ),
268 network: params.consensus.network,
269 },
270 private: LightningConfigPrivate {
271 sk: dealer_sk(peers.to_num_peers(), *peer),
272 },
273 };
274
275 (*peer, cfg.to_erased())
276 })
277 .collect()
278 }
279
280 async fn distributed_gen(
281 &self,
282 peers: &(dyn PeerHandleOps + Send + Sync),
283 params: &ConfigGenModuleParams,
284 disable_base_fees: bool,
285 ) -> anyhow::Result<ServerModuleConfig> {
286 let params = self.parse_params(params).unwrap();
287 let (polynomial, sks) = peers.run_dkg_g1().await?;
288
289 let server = LightningConfig {
290 consensus: LightningConfigConsensus {
291 tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
292 tpe_pks: peers
293 .num_peers()
294 .peer_ids()
295 .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
296 .collect(),
297 fee_consensus: params.consensus.fee_consensus.clone().unwrap_or(
298 if disable_base_fees {
299 FeeConsensus::zero()
300 } else {
301 FeeConsensus::new(0).expect("Relative fee is within range")
302 },
303 ),
304 network: params.consensus.network,
305 },
306 private: LightningConfigPrivate {
307 sk: SecretKeyShare(sks),
308 },
309 };
310
311 Ok(server.to_erased())
312 }
313
314 fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
315 let config = config.to_typed::<LightningConfig>()?;
316
317 ensure!(
318 tpe::derive_pk_share(&config.private.sk)
319 == *config
320 .consensus
321 .tpe_pks
322 .get(identity)
323 .context("Public key set has no key for our identity")?,
324 "Preimge encryption secret key share does not match our public key share"
325 );
326
327 Ok(())
328 }
329
330 fn get_client_config(
331 &self,
332 config: &ServerModuleConsensusConfig,
333 ) -> anyhow::Result<LightningClientConfig> {
334 let config = LightningConfigConsensus::from_erased(config)?;
335 Ok(LightningClientConfig {
336 tpe_agg_pk: config.tpe_agg_pk,
337 tpe_pks: config.tpe_pks,
338 fee_consensus: config.fee_consensus,
339 network: config.network,
340 })
341 }
342
343 fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
344 Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
345 }
346}
347
348fn dealer_agg_pk() -> AggregatePublicKey {
349 AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
350}
351
352fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
353 derive_pk_share(&dealer_sk(num_peers, peer))
354}
355
356fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
357 let x = Scalar::from(peer.to_usize() as u64 + 1);
358
359 let y = (0..num_peers.threshold())
363 .map(|index| coefficient(index as u64))
364 .rev()
365 .reduce(|accumulator, c| accumulator * x + c)
366 .expect("We have at least one coefficient");
367
368 SecretKeyShare(y)
369}
370
371fn coefficient(index: u64) -> Scalar {
372 Scalar::random(&mut ChaChaRng::from_seed(
373 *index.consensus_hash::<sha256::Hash>().as_byte_array(),
374 ))
375}
376
377#[derive(Debug)]
378pub struct Lightning {
379 cfg: LightningConfig,
380 db: Database,
381 server_bitcoin_rpc_monitor: ServerBitcoinRpcMonitor,
382}
383
384#[apply(async_trait_maybe_send!)]
385impl ServerModule for Lightning {
386 type Common = LightningModuleTypes;
387 type Init = LightningInit;
388
389 async fn consensus_proposal(
390 &self,
391 _dbtx: &mut DatabaseTransaction<'_>,
392 ) -> Vec<LightningConsensusItem> {
393 let mut items = vec![LightningConsensusItem::UnixTimeVote(
396 60 * (duration_since_epoch().as_secs() / 60),
397 )];
398
399 if let Ok(block_count) = self.get_block_count() {
400 trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
401 items.push(LightningConsensusItem::BlockCountVote(block_count));
402 }
403
404 items
405 }
406
407 async fn process_consensus_item<'a, 'b>(
408 &'a self,
409 dbtx: &mut DatabaseTransaction<'b>,
410 consensus_item: LightningConsensusItem,
411 peer: PeerId,
412 ) -> anyhow::Result<()> {
413 trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
414
415 match consensus_item {
416 LightningConsensusItem::BlockCountVote(vote) => {
417 let current_vote = dbtx
418 .insert_entry(&BlockCountVoteKey(peer), &vote)
419 .await
420 .unwrap_or(0);
421
422 ensure!(current_vote < vote, "Block count vote is redundant");
423
424 Ok(())
425 }
426 LightningConsensusItem::UnixTimeVote(vote) => {
427 let current_vote = dbtx
428 .insert_entry(&UnixTimeVoteKey(peer), &vote)
429 .await
430 .unwrap_or(0);
431
432 ensure!(current_vote < vote, "Unix time vote is redundant");
433
434 Ok(())
435 }
436 LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
437 "Received lnv2 consensus item with unknown variant {variant}"
438 )),
439 }
440 }
441
442 async fn process_input<'a, 'b, 'c>(
443 &'a self,
444 dbtx: &mut DatabaseTransaction<'c>,
445 input: &'b LightningInput,
446 _in_point: InPoint,
447 ) -> Result<InputMeta, LightningInputError> {
448 let (pub_key, amount) = match input.ensure_v0_ref()? {
449 LightningInputV0::Outgoing(outpoint, outgoing_witness) => {
450 let contract = dbtx
451 .remove_entry(&OutgoingContractKey(*outpoint))
452 .await
453 .ok_or(LightningInputError::UnknownContract)?;
454
455 let pub_key = match outgoing_witness {
456 OutgoingWitness::Claim(preimage) => {
457 if contract.expiration <= self.consensus_block_count(dbtx).await {
458 return Err(LightningInputError::Expired);
459 }
460
461 if !contract.verify_preimage(preimage) {
462 return Err(LightningInputError::InvalidPreimage);
463 }
464
465 dbtx.insert_entry(&PreimageKey(*outpoint), preimage).await;
466
467 contract.claim_pk
468 }
469 OutgoingWitness::Refund => {
470 if contract.expiration > self.consensus_block_count(dbtx).await {
471 return Err(LightningInputError::NotExpired);
472 }
473
474 contract.refund_pk
475 }
476 OutgoingWitness::Cancel(forfeit_signature) => {
477 if !contract.verify_forfeit_signature(forfeit_signature) {
478 return Err(LightningInputError::InvalidForfeitSignature);
479 }
480
481 contract.refund_pk
482 }
483 };
484
485 (pub_key, contract.amount)
486 }
487 LightningInputV0::Incoming(outpoint, agg_decryption_key) => {
488 let contract = dbtx
489 .remove_entry(&IncomingContractKey(*outpoint))
490 .await
491 .ok_or(LightningInputError::UnknownContract)?;
492
493 if let Some(index) = dbtx
494 .remove_entry(&IncomingContractIndexKey(*outpoint))
495 .await
496 {
497 dbtx.remove_entry(&IncomingContractStreamKey(index)).await;
498 }
499
500 if !contract
501 .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
502 {
503 return Err(LightningInputError::InvalidDecryptionKey);
504 }
505
506 let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
507 Some(..) => contract.commitment.claim_pk,
508 None => contract.commitment.refund_pk,
509 };
510
511 (pub_key, contract.commitment.amount)
512 }
513 };
514
515 Ok(InputMeta {
516 amount: TransactionItemAmount {
517 amount,
518 fee: self.cfg.consensus.fee_consensus.fee(amount),
519 },
520 pub_key,
521 })
522 }
523
524 async fn process_output<'a, 'b>(
525 &'a self,
526 dbtx: &mut DatabaseTransaction<'b>,
527 output: &'a LightningOutput,
528 outpoint: OutPoint,
529 ) -> Result<TransactionItemAmount, LightningOutputError> {
530 let amount = match output.ensure_v0_ref()? {
531 LightningOutputV0::Outgoing(contract) => {
532 dbtx.insert_new_entry(&OutgoingContractKey(outpoint), contract)
533 .await;
534
535 contract.amount
536 }
537 LightningOutputV0::Incoming(contract) => {
538 if !contract.verify() {
539 return Err(LightningOutputError::InvalidContract);
540 }
541
542 if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
543 return Err(LightningOutputError::ContractExpired);
544 }
545
546 dbtx.insert_new_entry(&IncomingContractKey(outpoint), contract)
547 .await;
548
549 dbtx.insert_entry(
550 &IncomingContractOutpointKey(contract.contract_id()),
551 &outpoint,
552 )
553 .await;
554
555 let stream_index = dbtx
556 .get_value(&IncomingContractStreamIndexKey)
557 .await
558 .unwrap_or(0);
559
560 dbtx.insert_entry(&IncomingContractStreamKey(stream_index), contract)
561 .await;
562
563 dbtx.insert_entry(&IncomingContractIndexKey(outpoint), &stream_index)
564 .await;
565
566 dbtx.insert_entry(&IncomingContractStreamIndexKey, &(stream_index + 1))
567 .await;
568
569 let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
570
571 dbtx.insert_entry(&DecryptionKeyShareKey(outpoint), &dk_share)
572 .await;
573
574 contract.commitment.amount
575 }
576 };
577
578 Ok(TransactionItemAmount {
579 amount,
580 fee: self.cfg.consensus.fee_consensus.fee(amount),
581 })
582 }
583
584 async fn output_status(
585 &self,
586 _dbtx: &mut DatabaseTransaction<'_>,
587 _out_point: OutPoint,
588 ) -> Option<LightningOutputOutcome> {
589 None
590 }
591
592 async fn audit(
593 &self,
594 dbtx: &mut DatabaseTransaction<'_>,
595 audit: &mut Audit,
596 module_instance_id: ModuleInstanceId,
597 ) {
598 audit
601 .add_items(
602 dbtx,
603 module_instance_id,
604 &OutgoingContractPrefix,
605 |_, contract| -(contract.amount.msats as i64),
606 )
607 .await;
608
609 audit
610 .add_items(
611 dbtx,
612 module_instance_id,
613 &IncomingContractPrefix,
614 |_, contract| -(contract.commitment.amount.msats as i64),
615 )
616 .await;
617 }
618
619 fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
620 vec![
621 api_endpoint! {
622 CONSENSUS_BLOCK_COUNT_ENDPOINT,
623 ApiVersion::new(0, 0),
624 async |module: &Lightning, context, _params : () | -> u64 {
625 let db = context.db();
626 let mut dbtx = db.begin_transaction_nc().await;
627
628 Ok(module.consensus_block_count(&mut dbtx).await)
629 }
630 },
631 api_endpoint! {
632 AWAIT_INCOMING_CONTRACT_ENDPOINT,
633 ApiVersion::new(0, 0),
634 async |module: &Lightning, context, params: (ContractId, u64) | -> Option<OutPoint> {
635 let db = context.db();
636
637 Ok(module.await_incoming_contract(db, params.0, params.1).await)
638 }
639 },
640 api_endpoint! {
641 AWAIT_PREIMAGE_ENDPOINT,
642 ApiVersion::new(0, 0),
643 async |module: &Lightning, context, params: (OutPoint, u64)| -> Option<[u8; 32]> {
644 let db = context.db();
645
646 Ok(module.await_preimage(db, params.0, params.1).await)
647 }
648 },
649 api_endpoint! {
650 DECRYPTION_KEY_SHARE_ENDPOINT,
651 ApiVersion::new(0, 0),
652 async |_module: &Lightning, context, params: OutPoint| -> DecryptionKeyShare {
653 let share = context
654 .db()
655 .begin_transaction_nc()
656 .await
657 .get_value(&DecryptionKeyShareKey(params))
658 .await
659 .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
660
661 Ok(share)
662 }
663 },
664 api_endpoint! {
665 OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
666 ApiVersion::new(0, 0),
667 async |module: &Lightning, context, outpoint: OutPoint| -> Option<(ContractId, u64)> {
668 let db = context.db();
669
670 Ok(module.outgoing_contract_expiration(db, outpoint).await)
671 }
672 },
673 api_endpoint! {
674 AWAIT_INCOMING_CONTRACTS_ENDPOINT,
675 ApiVersion::new(0, 0),
676 async |module: &Lightning, context, params: (u64, usize)| -> (Vec<IncomingContract>, u64) {
677 let db = context.db();
678
679 if params.1 == 0 {
680 return Err(ApiError::bad_request("Batch size must be greater than 0".to_string()));
681 }
682
683 Ok(module.await_incoming_contracts(db, params.0, params.1).await)
684 }
685 },
686 api_endpoint! {
687 ADD_GATEWAY_ENDPOINT,
688 ApiVersion::new(0, 0),
689 async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
690 check_auth(context)?;
691
692 let db = context.db();
693
694 Ok(Lightning::add_gateway(db, gateway).await)
695 }
696 },
697 api_endpoint! {
698 REMOVE_GATEWAY_ENDPOINT,
699 ApiVersion::new(0, 0),
700 async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
701 check_auth(context)?;
702
703 let db = context.db();
704
705 Ok(Lightning::remove_gateway(db, gateway).await)
706 }
707 },
708 api_endpoint! {
709 GATEWAYS_ENDPOINT,
710 ApiVersion::new(0, 0),
711 async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
712 let db = context.db();
713
714 Ok(Lightning::gateways(db).await)
715 }
716 },
717 ]
718 }
719}
720
721impl Lightning {
722 fn get_block_count(&self) -> anyhow::Result<u64> {
723 self.server_bitcoin_rpc_monitor
724 .status()
725 .map(|status| status.block_count)
726 .context("Block count not available yet")
727 }
728
729 async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
730 let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
731
732 let mut counts = dbtx
733 .find_by_prefix(&BlockCountVotePrefix)
734 .await
735 .map(|entry| entry.1)
736 .collect::<Vec<u64>>()
737 .await;
738
739 counts.sort_unstable();
740
741 counts.reverse();
742
743 assert!(counts.last() <= counts.first());
744
745 counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
750 }
751
752 async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
753 let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
754
755 let mut times = dbtx
756 .find_by_prefix(&UnixTimeVotePrefix)
757 .await
758 .map(|entry| entry.1)
759 .collect::<Vec<u64>>()
760 .await;
761
762 times.sort_unstable();
763
764 times.reverse();
765
766 assert!(times.last() <= times.first());
767
768 times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
773 }
774
775 async fn await_incoming_contract(
776 &self,
777 db: Database,
778 contract_id: ContractId,
779 expiration: u64,
780 ) -> Option<OutPoint> {
781 loop {
782 timeout(
783 Duration::from_secs(10),
784 db.wait_key_exists(&IncomingContractOutpointKey(contract_id)),
785 )
786 .await
787 .ok();
788
789 let mut dbtx = db.begin_transaction_nc().await;
792
793 if let Some(outpoint) = dbtx
794 .get_value(&IncomingContractOutpointKey(contract_id))
795 .await
796 {
797 return Some(outpoint);
798 }
799
800 if expiration <= self.consensus_unix_time(&mut dbtx).await {
801 return None;
802 }
803 }
804 }
805
806 async fn await_preimage(
807 &self,
808 db: Database,
809 outpoint: OutPoint,
810 expiration: u64,
811 ) -> Option<[u8; 32]> {
812 loop {
813 timeout(
814 Duration::from_secs(10),
815 db.wait_key_exists(&PreimageKey(outpoint)),
816 )
817 .await
818 .ok();
819
820 let mut dbtx = db.begin_transaction_nc().await;
823
824 if let Some(preimage) = dbtx.get_value(&PreimageKey(outpoint)).await {
825 return Some(preimage);
826 }
827
828 if expiration <= self.consensus_block_count(&mut dbtx).await {
829 return None;
830 }
831 }
832 }
833
834 async fn outgoing_contract_expiration(
835 &self,
836 db: Database,
837 outpoint: OutPoint,
838 ) -> Option<(ContractId, u64)> {
839 let mut dbtx = db.begin_transaction_nc().await;
840
841 let contract = dbtx.get_value(&OutgoingContractKey(outpoint)).await?;
842
843 let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
844
845 let expiration = contract.expiration.saturating_sub(consensus_block_count);
846
847 Some((contract.contract_id(), expiration))
848 }
849
850 async fn await_incoming_contracts(
851 &self,
852 db: Database,
853 start: u64,
854 n: usize,
855 ) -> (Vec<IncomingContract>, u64) {
856 let filter = |next_index: Option<u64>| next_index.filter(|i| *i > start);
857
858 let (mut next_index, mut dbtx) = db
859 .wait_key_check(&IncomingContractStreamIndexKey, filter)
860 .await;
861
862 let mut contracts = Vec::with_capacity(n);
863
864 let range = IncomingContractStreamKey(start)..IncomingContractStreamKey(u64::MAX);
865
866 for (key, contract) in dbtx
867 .find_by_range(range)
868 .await
869 .take(n)
870 .collect::<Vec<(IncomingContractStreamKey, IncomingContract)>>()
871 .await
872 {
873 contracts.push(contract.clone());
874 next_index = key.0 + 1;
875 }
876
877 (contracts, next_index)
878 }
879
880 async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
881 let mut dbtx = db.begin_transaction().await;
882
883 let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
884
885 dbtx.commit_tx().await;
886
887 is_new_entry
888 }
889
890 async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
891 let mut dbtx = db.begin_transaction().await;
892
893 let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
894
895 dbtx.commit_tx().await;
896
897 entry_existed
898 }
899
900 async fn gateways(db: Database) -> Vec<SafeUrl> {
901 db.begin_transaction_nc()
902 .await
903 .find_by_prefix(&GatewayPrefix)
904 .await
905 .map(|entry| entry.0.0)
906 .collect()
907 .await
908 }
909
910 pub async fn consensus_block_count_ui(&self) -> u64 {
911 self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
912 .await
913 }
914
915 pub async fn consensus_unix_time_ui(&self) -> u64 {
916 self.consensus_unix_time(&mut self.db.begin_transaction_nc().await)
917 .await
918 }
919
920 pub async fn add_gateway_ui(&self, gateway: SafeUrl) -> bool {
921 Self::add_gateway(self.db.clone(), gateway).await
922 }
923
924 pub async fn remove_gateway_ui(&self, gateway: SafeUrl) -> bool {
925 Self::remove_gateway(self.db.clone(), gateway).await
926 }
927
928 pub async fn gateways_ui(&self) -> Vec<SafeUrl> {
929 Self::gateways(self.db.clone()).await
930 }
931}