1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5mod db;
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::time::Duration;
9
10use anyhow::{Context, anyhow, ensure, format_err};
11use bls12_381::{G1Projective, Scalar};
12use fedimint_bitcoind::create_bitcoind;
13use fedimint_bitcoind::shared::ServerModuleSharedBitcoin;
14use fedimint_core::bitcoin::hashes::sha256;
15use fedimint_core::config::{
16 ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
17 TypedServerModuleConfig, TypedServerModuleConsensusConfig,
18};
19use fedimint_core::core::ModuleInstanceId;
20use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
21use fedimint_core::encoding::Encodable;
22use fedimint_core::module::audit::Audit;
23use fedimint_core::module::{
24 ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion, InputMeta,
25 ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions, TransactionItemAmount,
26 api_endpoint,
27};
28use fedimint_core::task::timeout;
29use fedimint_core::time::duration_since_epoch;
30use fedimint_core::util::SafeUrl;
31use fedimint_core::{
32 BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
33 push_db_pair_items,
34};
35use fedimint_lnv2_common::config::{
36 LightningClientConfig, LightningConfig, LightningConfigConsensus, LightningConfigLocal,
37 LightningConfigPrivate, LightningGenParams,
38};
39use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
40use fedimint_lnv2_common::endpoint_constants::{
41 ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_PREIMAGE_ENDPOINT,
42 CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT, GATEWAYS_ENDPOINT,
43 OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
44};
45use fedimint_lnv2_common::{
46 ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
47 LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
48 LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
49};
50use fedimint_logging::LOG_MODULE_LNV2;
51use fedimint_server_core::config::{PeerHandleOps, eval_poly_g1};
52use fedimint_server_core::net::check_auth;
53use fedimint_server_core::{ServerModule, ServerModuleInit, ServerModuleInitArgs};
54use futures::StreamExt;
55use group::Curve;
56use group::ff::Field;
57use rand::SeedableRng;
58use rand_chacha::ChaChaRng;
59use strum::IntoEnumIterator;
60use tokio::sync::watch;
61use tpe::{
62 AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
63};
64use tracing::trace;
65
66use crate::db::{
67 BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
68 DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractKey,
69 IncomingContractOutpointKey, IncomingContractOutpointPrefix, IncomingContractPrefix,
70 OutgoingContractKey, OutgoingContractPrefix, PreimageKey, PreimagePrefix, UnixTimeVoteKey,
71 UnixTimeVotePrefix,
72};
73
74#[derive(Debug, Clone)]
75pub struct LightningInit;
76
77impl ModuleInit for LightningInit {
78 type Common = LightningCommonInit;
79
80 async fn dump_database(
81 &self,
82 dbtx: &mut DatabaseTransaction<'_>,
83 prefix_names: Vec<String>,
84 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
85 let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
86 BTreeMap::new();
87
88 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
89 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
90 });
91
92 for table in filtered_prefixes {
93 match table {
94 DbKeyPrefix::BlockCountVote => {
95 push_db_pair_items!(
96 dbtx,
97 BlockCountVotePrefix,
98 BlockCountVoteKey,
99 u64,
100 lightning,
101 "Lightning Block Count Votes"
102 );
103 }
104 DbKeyPrefix::UnixTimeVote => {
105 push_db_pair_items!(
106 dbtx,
107 UnixTimeVotePrefix,
108 UnixTimeVoteKey,
109 u64,
110 lightning,
111 "Lightning Unix Time Votes"
112 );
113 }
114 DbKeyPrefix::OutgoingContract => {
115 push_db_pair_items!(
116 dbtx,
117 OutgoingContractPrefix,
118 LightningOutgoingContractKey,
119 OutgoingContract,
120 lightning,
121 "Lightning Outgoing Contracts"
122 );
123 }
124 DbKeyPrefix::IncomingContract => {
125 push_db_pair_items!(
126 dbtx,
127 IncomingContractPrefix,
128 LightningIncomingContractKey,
129 IncomingContract,
130 lightning,
131 "Lightning Incoming Contracts"
132 );
133 }
134 DbKeyPrefix::IncomingContractOutpoint => {
135 push_db_pair_items!(
136 dbtx,
137 IncomingContractOutpointPrefix,
138 LightningIncomingContractOutpointKey,
139 OutPoint,
140 lightning,
141 "Lightning Incoming Contracts Outpoints"
142 );
143 }
144 DbKeyPrefix::DecryptionKeyShare => {
145 push_db_pair_items!(
146 dbtx,
147 DecryptionKeySharePrefix,
148 DecryptionKeyShareKey,
149 DecryptionKeyShare,
150 lightning,
151 "Lightning Decryption Key Share"
152 );
153 }
154 DbKeyPrefix::Preimage => {
155 push_db_pair_items!(
156 dbtx,
157 PreimagePrefix,
158 LightningPreimageKey,
159 [u8; 32],
160 lightning,
161 "Lightning Preimages"
162 );
163 }
164 DbKeyPrefix::Gateway => {
165 push_db_pair_items!(
166 dbtx,
167 GatewayPrefix,
168 GatewayKey,
169 (),
170 lightning,
171 "Lightning Gateways"
172 );
173 }
174 }
175 }
176
177 Box::new(lightning.into_iter())
178 }
179}
180
181#[apply(async_trait_maybe_send!)]
182impl ServerModuleInit for LightningInit {
183 type Module = Lightning;
184 type Params = LightningGenParams;
185
186 fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
187 &[MODULE_CONSENSUS_VERSION]
188 }
189
190 fn supported_api_versions(&self) -> SupportedModuleApiVersions {
191 SupportedModuleApiVersions::from_raw(
192 (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
193 (
194 MODULE_CONSENSUS_VERSION.major,
195 MODULE_CONSENSUS_VERSION.minor,
196 ),
197 &[(0, 0)],
198 )
199 }
200
201 async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
202 Ok(Lightning::new(args.cfg().to_typed()?, args.db(), &args.shared()).await?)
203 }
204
205 fn trusted_dealer_gen(
206 &self,
207 peers: &[PeerId],
208 params: &ConfigGenModuleParams,
209 ) -> BTreeMap<PeerId, ServerModuleConfig> {
210 let params = self
211 .parse_params(params)
212 .expect("Failed tp parse lnv2 config gen params");
213
214 let tpe_pks = peers
215 .iter()
216 .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
217 .collect::<BTreeMap<PeerId, PublicKeyShare>>();
218
219 peers
220 .iter()
221 .map(|peer| {
222 let cfg = LightningConfig {
223 local: LightningConfigLocal {
224 bitcoin_rpc: params.local.bitcoin_rpc.clone(),
225 },
226 consensus: LightningConfigConsensus {
227 tpe_agg_pk: dealer_agg_pk(),
228 tpe_pks: tpe_pks.clone(),
229 fee_consensus: params.consensus.fee_consensus.clone(),
230 network: params.consensus.network,
231 },
232 private: LightningConfigPrivate {
233 sk: dealer_sk(peers.to_num_peers(), *peer),
234 },
235 };
236
237 (*peer, cfg.to_erased())
238 })
239 .collect()
240 }
241
242 async fn distributed_gen(
243 &self,
244 peers: &(dyn PeerHandleOps + Send + Sync),
245 params: &ConfigGenModuleParams,
246 ) -> anyhow::Result<ServerModuleConfig> {
247 let params = self.parse_params(params).unwrap();
248 let (polynomial, sks) = peers.run_dkg_g1().await?;
249
250 let server = LightningConfig {
251 local: LightningConfigLocal {
252 bitcoin_rpc: params.local.bitcoin_rpc.clone(),
253 },
254 consensus: LightningConfigConsensus {
255 tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
256 tpe_pks: peers
257 .num_peers()
258 .peer_ids()
259 .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
260 .collect(),
261 fee_consensus: params.consensus.fee_consensus.clone(),
262 network: params.consensus.network,
263 },
264 private: LightningConfigPrivate {
265 sk: SecretKeyShare(sks),
266 },
267 };
268
269 Ok(server.to_erased())
270 }
271
272 fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
273 let config = config.to_typed::<LightningConfig>()?;
274
275 ensure!(
276 tpe::derive_pk_share(&config.private.sk)
277 == *config
278 .consensus
279 .tpe_pks
280 .get(identity)
281 .context("Public key set has no key for our identity")?,
282 "Preimge encryption secret key share does not match our public key share"
283 );
284
285 Ok(())
286 }
287
288 fn get_client_config(
289 &self,
290 config: &ServerModuleConsensusConfig,
291 ) -> anyhow::Result<LightningClientConfig> {
292 let config = LightningConfigConsensus::from_erased(config)?;
293 Ok(LightningClientConfig {
294 tpe_agg_pk: config.tpe_agg_pk,
295 tpe_pks: config.tpe_pks,
296 fee_consensus: config.fee_consensus,
297 network: config.network,
298 })
299 }
300
301 fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
302 Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
303 }
304}
305
306fn dealer_agg_pk() -> AggregatePublicKey {
307 AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
308}
309
310fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
311 derive_pk_share(&dealer_sk(num_peers, peer))
312}
313
314fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
315 let x = Scalar::from(peer.to_usize() as u64 + 1);
316
317 let y = (0..num_peers.threshold())
321 .map(|index| coefficient(index as u64))
322 .rev()
323 .reduce(|accumulator, c| accumulator * x + c)
324 .expect("We have at least one coefficient");
325
326 SecretKeyShare(y)
327}
328
329fn coefficient(index: u64) -> Scalar {
330 Scalar::random(&mut ChaChaRng::from_seed(
331 *index.consensus_hash::<sha256::Hash>().as_byte_array(),
332 ))
333}
334
335#[derive(Debug)]
336pub struct Lightning {
337 cfg: LightningConfig,
338 db: Database,
339 block_count_rx: watch::Receiver<Option<u64>>,
341}
342
343#[apply(async_trait_maybe_send!)]
344impl ServerModule for Lightning {
345 type Common = LightningModuleTypes;
346 type Init = LightningInit;
347
348 async fn consensus_proposal(
349 &self,
350 _dbtx: &mut DatabaseTransaction<'_>,
351 ) -> Vec<LightningConsensusItem> {
352 let mut items = vec![LightningConsensusItem::UnixTimeVote(
355 60 * (duration_since_epoch().as_secs() / 60),
356 )];
357
358 if let Ok(block_count) = self.get_block_count() {
359 trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
360 items.push(LightningConsensusItem::BlockCountVote(block_count));
361 }
362
363 items
364 }
365
366 async fn process_consensus_item<'a, 'b>(
367 &'a self,
368 dbtx: &mut DatabaseTransaction<'b>,
369 consensus_item: LightningConsensusItem,
370 peer: PeerId,
371 ) -> anyhow::Result<()> {
372 trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
373
374 match consensus_item {
375 LightningConsensusItem::BlockCountVote(vote) => {
376 let current_vote = dbtx
377 .insert_entry(&BlockCountVoteKey(peer), &vote)
378 .await
379 .unwrap_or(0);
380
381 ensure!(current_vote < vote, "Block count vote is redundant");
382
383 Ok(())
384 }
385 LightningConsensusItem::UnixTimeVote(vote) => {
386 let current_vote = dbtx
387 .insert_entry(&UnixTimeVoteKey(peer), &vote)
388 .await
389 .unwrap_or(0);
390
391 ensure!(current_vote < vote, "Unix time vote is redundant");
392
393 Ok(())
394 }
395 LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
396 "Received lnv2 consensus item with unknown variant {variant}"
397 )),
398 }
399 }
400
401 async fn process_input<'a, 'b, 'c>(
402 &'a self,
403 dbtx: &mut DatabaseTransaction<'c>,
404 input: &'b LightningInput,
405 _in_point: InPoint,
406 ) -> Result<InputMeta, LightningInputError> {
407 let (pub_key, amount) = match input.ensure_v0_ref()? {
408 LightningInputV0::Outgoing(outpoint, outgoing_witness) => {
409 let contract = dbtx
410 .remove_entry(&OutgoingContractKey(*outpoint))
411 .await
412 .ok_or(LightningInputError::UnknownContract)?;
413
414 let pub_key = match outgoing_witness {
415 OutgoingWitness::Claim(preimage) => {
416 if contract.expiration <= self.consensus_block_count(dbtx).await {
417 return Err(LightningInputError::Expired);
418 }
419
420 if !contract.verify_preimage(preimage) {
421 return Err(LightningInputError::InvalidPreimage);
422 }
423
424 dbtx.insert_entry(&PreimageKey(*outpoint), preimage).await;
425
426 contract.claim_pk
427 }
428 OutgoingWitness::Refund => {
429 if contract.expiration > self.consensus_block_count(dbtx).await {
430 return Err(LightningInputError::NotExpired);
431 }
432
433 contract.refund_pk
434 }
435 OutgoingWitness::Cancel(forfeit_signature) => {
436 if !contract.verify_forfeit_signature(forfeit_signature) {
437 return Err(LightningInputError::InvalidForfeitSignature);
438 }
439
440 contract.refund_pk
441 }
442 };
443
444 (pub_key, contract.amount)
445 }
446 LightningInputV0::Incoming(outpoint, agg_decryption_key) => {
447 let contract = dbtx
448 .remove_entry(&IncomingContractKey(*outpoint))
449 .await
450 .ok_or(LightningInputError::UnknownContract)?;
451
452 if !contract
453 .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
454 {
455 return Err(LightningInputError::InvalidDecryptionKey);
456 }
457
458 let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
459 Some(..) => contract.commitment.claim_pk,
460 None => contract.commitment.refund_pk,
461 };
462
463 (pub_key, contract.commitment.amount)
464 }
465 };
466
467 Ok(InputMeta {
468 amount: TransactionItemAmount {
469 amount,
470 fee: self.cfg.consensus.fee_consensus.fee(amount),
471 },
472 pub_key,
473 })
474 }
475
476 async fn process_output<'a, 'b>(
477 &'a self,
478 dbtx: &mut DatabaseTransaction<'b>,
479 output: &'a LightningOutput,
480 outpoint: OutPoint,
481 ) -> Result<TransactionItemAmount, LightningOutputError> {
482 let amount = match output.ensure_v0_ref()? {
483 LightningOutputV0::Outgoing(contract) => {
484 dbtx.insert_new_entry(&OutgoingContractKey(outpoint), contract)
485 .await;
486
487 contract.amount
488 }
489 LightningOutputV0::Incoming(contract) => {
490 if !contract.verify() {
491 return Err(LightningOutputError::InvalidContract);
492 }
493
494 if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
495 return Err(LightningOutputError::ContractExpired);
496 }
497
498 dbtx.insert_new_entry(&IncomingContractKey(outpoint), contract)
499 .await;
500
501 dbtx.insert_entry(
502 &IncomingContractOutpointKey(contract.contract_id()),
503 &outpoint,
504 )
505 .await;
506
507 let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
508
509 dbtx.insert_entry(&DecryptionKeyShareKey(outpoint), &dk_share)
510 .await;
511
512 contract.commitment.amount
513 }
514 };
515
516 Ok(TransactionItemAmount {
517 amount,
518 fee: self.cfg.consensus.fee_consensus.fee(amount),
519 })
520 }
521
522 async fn output_status(
523 &self,
524 _dbtx: &mut DatabaseTransaction<'_>,
525 _out_point: OutPoint,
526 ) -> Option<LightningOutputOutcome> {
527 None
528 }
529
530 async fn audit(
531 &self,
532 dbtx: &mut DatabaseTransaction<'_>,
533 audit: &mut Audit,
534 module_instance_id: ModuleInstanceId,
535 ) {
536 audit
539 .add_items(
540 dbtx,
541 module_instance_id,
542 &OutgoingContractPrefix,
543 |_, contract| -(contract.amount.msats as i64),
544 )
545 .await;
546
547 audit
548 .add_items(
549 dbtx,
550 module_instance_id,
551 &IncomingContractPrefix,
552 |_, contract| -(contract.commitment.amount.msats as i64),
553 )
554 .await;
555 }
556
557 fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
558 vec![
559 api_endpoint! {
560 CONSENSUS_BLOCK_COUNT_ENDPOINT,
561 ApiVersion::new(0, 0),
562 async |module: &Lightning, context, _params : () | -> u64 {
563 let db = context.db();
564 let mut dbtx = db.begin_transaction_nc().await;
565
566 Ok(module.consensus_block_count(&mut dbtx).await)
567 }
568 },
569 api_endpoint! {
570 AWAIT_INCOMING_CONTRACT_ENDPOINT,
571 ApiVersion::new(0, 0),
572 async |module: &Lightning, context, params: (ContractId, u64) | -> Option<OutPoint> {
573 let db = context.db();
574
575 Ok(module.await_incoming_contract(db, params.0, params.1).await)
576 }
577 },
578 api_endpoint! {
579 AWAIT_PREIMAGE_ENDPOINT,
580 ApiVersion::new(0, 0),
581 async |module: &Lightning, context, params: (OutPoint, u64)| -> Option<[u8; 32]> {
582 let db = context.db();
583
584 Ok(module.await_preimage(db, params.0, params.1).await)
585 }
586 },
587 api_endpoint! {
588 DECRYPTION_KEY_SHARE_ENDPOINT,
589 ApiVersion::new(0, 0),
590 async |_module: &Lightning, context, params: OutPoint| -> DecryptionKeyShare {
591 let share = context
592 .db()
593 .begin_transaction_nc()
594 .await
595 .get_value(&DecryptionKeyShareKey(params))
596 .await
597 .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
598
599 Ok(share)
600 }
601 },
602 api_endpoint! {
603 OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
604 ApiVersion::new(0, 0),
605 async |module: &Lightning, context, outpoint: OutPoint| -> Option<(ContractId, u64)> {
606 let db = context.db();
607
608 Ok(module.outgoing_contract_expiration(db, outpoint).await)
609 }
610 },
611 api_endpoint! {
612 ADD_GATEWAY_ENDPOINT,
613 ApiVersion::new(0, 0),
614 async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
615 check_auth(context)?;
616
617 let db = context.db();
618
619 Ok(Lightning::add_gateway(db, gateway).await)
620 }
621 },
622 api_endpoint! {
623 REMOVE_GATEWAY_ENDPOINT,
624 ApiVersion::new(0, 0),
625 async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
626 check_auth(context)?;
627
628 let db = context.db();
629
630 Ok(Lightning::remove_gateway(db, gateway).await)
631 }
632 },
633 api_endpoint! {
634 GATEWAYS_ENDPOINT,
635 ApiVersion::new(0, 0),
636 async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
637 let db = context.db();
638
639 Ok(Lightning::gateways(db).await)
640 }
641 },
642 ]
643 }
644}
645
646impl Lightning {
647 async fn new(
648 cfg: LightningConfig,
649 db: &Database,
650 shared_bitcoin: &ServerModuleSharedBitcoin,
651 ) -> anyhow::Result<Self> {
652 let btc_rpc = create_bitcoind(&cfg.local.bitcoin_rpc)?;
653 let block_count_rx = shared_bitcoin
654 .block_count_receiver(cfg.consensus.network, btc_rpc.clone())
655 .await;
656
657 Ok(Lightning {
658 cfg,
659 db: db.clone(),
660 block_count_rx,
661 })
662 }
663
664 fn get_block_count(&self) -> anyhow::Result<u64> {
665 self.block_count_rx
666 .borrow()
667 .ok_or_else(|| format_err!("Block count not available yet"))
668 }
669
670 async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
671 let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
672
673 let mut counts = dbtx
674 .find_by_prefix(&BlockCountVotePrefix)
675 .await
676 .map(|entry| entry.1)
677 .collect::<Vec<u64>>()
678 .await;
679
680 counts.sort_unstable();
681
682 counts.reverse();
683
684 assert!(counts.last() <= counts.first());
685
686 counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
691 }
692
693 async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
694 let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
695
696 let mut times = dbtx
697 .find_by_prefix(&UnixTimeVotePrefix)
698 .await
699 .map(|entry| entry.1)
700 .collect::<Vec<u64>>()
701 .await;
702
703 times.sort_unstable();
704
705 times.reverse();
706
707 assert!(times.last() <= times.first());
708
709 times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
714 }
715
716 async fn await_incoming_contract(
717 &self,
718 db: Database,
719 contract_id: ContractId,
720 expiration: u64,
721 ) -> Option<OutPoint> {
722 loop {
723 timeout(
724 Duration::from_secs(10),
725 db.wait_key_exists(&IncomingContractOutpointKey(contract_id)),
726 )
727 .await
728 .ok();
729
730 let mut dbtx = db.begin_transaction_nc().await;
733
734 if let Some(outpoint) = dbtx
735 .get_value(&IncomingContractOutpointKey(contract_id))
736 .await
737 {
738 return Some(outpoint);
739 }
740
741 if expiration <= self.consensus_unix_time(&mut dbtx).await {
742 return None;
743 }
744 }
745 }
746
747 async fn await_preimage(
748 &self,
749 db: Database,
750 outpoint: OutPoint,
751 expiration: u64,
752 ) -> Option<[u8; 32]> {
753 loop {
754 timeout(
755 Duration::from_secs(10),
756 db.wait_key_exists(&PreimageKey(outpoint)),
757 )
758 .await
759 .ok();
760
761 let mut dbtx = db.begin_transaction_nc().await;
764
765 if let Some(preimage) = dbtx.get_value(&PreimageKey(outpoint)).await {
766 return Some(preimage);
767 }
768
769 if expiration <= self.consensus_block_count(&mut dbtx).await {
770 return None;
771 }
772 }
773 }
774
775 async fn outgoing_contract_expiration(
776 &self,
777 db: Database,
778 outpoint: OutPoint,
779 ) -> Option<(ContractId, u64)> {
780 let mut dbtx = db.begin_transaction_nc().await;
781
782 let contract = dbtx.get_value(&OutgoingContractKey(outpoint)).await?;
783
784 let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
785
786 let expiration = contract.expiration.saturating_sub(consensus_block_count);
787
788 Some((contract.contract_id(), expiration))
789 }
790
791 async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
792 let mut dbtx = db.begin_transaction().await;
793
794 let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
795
796 dbtx.commit_tx().await;
797
798 is_new_entry
799 }
800
801 async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
802 let mut dbtx = db.begin_transaction().await;
803
804 let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
805
806 dbtx.commit_tx().await;
807
808 entry_existed
809 }
810
811 async fn gateways(db: Database) -> Vec<SafeUrl> {
812 db.begin_transaction_nc()
813 .await
814 .find_by_prefix(&GatewayPrefix)
815 .await
816 .map(|entry| entry.0.0)
817 .collect()
818 .await
819 }
820
821 pub async fn consensus_block_count_ui(&self) -> u64 {
822 self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
823 .await
824 }
825
826 pub async fn consensus_unix_time_ui(&self) -> u64 {
827 self.consensus_unix_time(&mut self.db.begin_transaction_nc().await)
828 .await
829 }
830
831 pub async fn add_gateway_ui(&self, gateway: SafeUrl) -> bool {
832 Self::add_gateway(self.db.clone(), gateway).await
833 }
834
835 pub async fn remove_gateway_ui(&self, gateway: SafeUrl) -> bool {
836 Self::remove_gateway(self.db.clone(), gateway).await
837 }
838
839 pub async fn gateways_ui(&self) -> Vec<SafeUrl> {
840 Self::gateways(self.db.clone()).await
841 }
842}