1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5mod db;
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::time::Duration;
9
10use anyhow::{Context, anyhow, ensure, format_err};
11use bls12_381::{G1Projective, Scalar};
12use fedimint_bitcoind::create_bitcoind;
13use fedimint_bitcoind::shared::ServerModuleSharedBitcoin;
14use fedimint_core::bitcoin::hashes::sha256;
15use fedimint_core::config::{
16 ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
17 TypedServerModuleConfig, TypedServerModuleConsensusConfig,
18};
19use fedimint_core::core::ModuleInstanceId;
20use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
21use fedimint_core::encoding::Encodable;
22use fedimint_core::module::audit::Audit;
23use fedimint_core::module::{
24 ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion, InputMeta,
25 ModuleConsensusVersion, ModuleInit, PeerHandle, SupportedModuleApiVersions,
26 TransactionItemAmount, api_endpoint,
27};
28use fedimint_core::task::timeout;
29use fedimint_core::time::duration_since_epoch;
30use fedimint_core::util::SafeUrl;
31use fedimint_core::{
32 BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
33 push_db_pair_items,
34};
35use fedimint_lnv2_common::config::{
36 LightningClientConfig, LightningConfig, LightningConfigConsensus, LightningConfigLocal,
37 LightningConfigPrivate, LightningGenParams,
38};
39use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
40use fedimint_lnv2_common::endpoint_constants::{
41 ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_PREIMAGE_ENDPOINT,
42 CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT, GATEWAYS_ENDPOINT,
43 OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
44};
45use fedimint_lnv2_common::{
46 ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
47 LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
48 LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
49};
50use fedimint_logging::LOG_MODULE_LNV2;
51use fedimint_server::config::distributedgen::{PeerHandleOps, eval_poly_g1};
52use fedimint_server::core::{ServerModule, ServerModuleInit, ServerModuleInitArgs};
53use fedimint_server::net::api::check_auth;
54use futures::StreamExt;
55use group::Curve;
56use group::ff::Field;
57use rand::SeedableRng;
58use rand_chacha::ChaChaRng;
59use strum::IntoEnumIterator;
60use tokio::sync::watch;
61use tpe::{
62 AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
63};
64use tracing::trace;
65
66use crate::db::{
67 BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
68 DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractKey,
69 IncomingContractPrefix, OutgoingContractKey, OutgoingContractPrefix, PreimageKey,
70 PreimagePrefix, UnixTimeVoteKey, UnixTimeVotePrefix,
71};
72
73#[derive(Debug, Clone)]
74pub struct LightningInit;
75
76impl ModuleInit for LightningInit {
77 type Common = LightningCommonInit;
78
79 async fn dump_database(
80 &self,
81 dbtx: &mut DatabaseTransaction<'_>,
82 prefix_names: Vec<String>,
83 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
84 let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
85 BTreeMap::new();
86
87 let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
88 prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
89 });
90
91 for table in filtered_prefixes {
92 match table {
93 DbKeyPrefix::BlockCountVote => {
94 push_db_pair_items!(
95 dbtx,
96 BlockCountVotePrefix,
97 BlockCountVoteKey,
98 u64,
99 lightning,
100 "Lightning Block Count Votes"
101 );
102 }
103 DbKeyPrefix::UnixTimeVote => {
104 push_db_pair_items!(
105 dbtx,
106 UnixTimeVotePrefix,
107 UnixTimeVoteKey,
108 u64,
109 lightning,
110 "Lightning Unix Time Votes"
111 );
112 }
113 DbKeyPrefix::OutgoingContract => {
114 push_db_pair_items!(
115 dbtx,
116 OutgoingContractPrefix,
117 LightningOutgoingContractKey,
118 OutgoingContract,
119 lightning,
120 "Lightning Outgoing Contracts"
121 );
122 }
123 DbKeyPrefix::IncomingContract => {
124 push_db_pair_items!(
125 dbtx,
126 IncomingContractPrefix,
127 LightningIncomingContractKey,
128 IncomingContract,
129 lightning,
130 "Lightning Incoming Contracts"
131 );
132 }
133 DbKeyPrefix::DecryptionKeyShare => {
134 push_db_pair_items!(
135 dbtx,
136 DecryptionKeySharePrefix,
137 DecryptionKeyShareKey,
138 DecryptionKeyShare,
139 lightning,
140 "Lightning Decryption Key Share"
141 );
142 }
143 DbKeyPrefix::Preimage => {
144 push_db_pair_items!(
145 dbtx,
146 PreimagePrefix,
147 LightningPreimageKey,
148 [u8; 32],
149 lightning,
150 "Lightning Preimages"
151 );
152 }
153 DbKeyPrefix::Gateway => {
154 push_db_pair_items!(
155 dbtx,
156 GatewayPrefix,
157 GatewayKey,
158 (),
159 lightning,
160 "Lightning Gateways"
161 );
162 }
163 }
164 }
165
166 Box::new(lightning.into_iter())
167 }
168}
169
170#[apply(async_trait_maybe_send!)]
171impl ServerModuleInit for LightningInit {
172 type Module = Lightning;
173 type Params = LightningGenParams;
174
175 fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
176 &[MODULE_CONSENSUS_VERSION]
177 }
178
179 fn supported_api_versions(&self) -> SupportedModuleApiVersions {
180 SupportedModuleApiVersions::from_raw(
181 (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
182 (
183 MODULE_CONSENSUS_VERSION.major,
184 MODULE_CONSENSUS_VERSION.minor,
185 ),
186 &[(0, 0)],
187 )
188 }
189
190 async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
191 Ok(Lightning::new(args.cfg().to_typed()?, &args.shared()).await?)
192 }
193
194 fn trusted_dealer_gen(
195 &self,
196 peers: &[PeerId],
197 params: &ConfigGenModuleParams,
198 ) -> BTreeMap<PeerId, ServerModuleConfig> {
199 let params = self
200 .parse_params(params)
201 .expect("Failed tp parse lnv2 config gen params");
202
203 let tpe_pks = peers
204 .iter()
205 .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
206 .collect::<BTreeMap<PeerId, PublicKeyShare>>();
207
208 peers
209 .iter()
210 .map(|peer| {
211 let cfg = LightningConfig {
212 local: LightningConfigLocal {
213 bitcoin_rpc: params.local.bitcoin_rpc.clone(),
214 },
215 consensus: LightningConfigConsensus {
216 tpe_agg_pk: dealer_agg_pk(),
217 tpe_pks: tpe_pks.clone(),
218 fee_consensus: params.consensus.fee_consensus.clone(),
219 network: params.consensus.network,
220 },
221 private: LightningConfigPrivate {
222 sk: dealer_sk(peers.to_num_peers(), *peer),
223 },
224 };
225
226 (*peer, cfg.to_erased())
227 })
228 .collect()
229 }
230
231 async fn distributed_gen(
232 &self,
233 peers: &PeerHandle,
234 params: &ConfigGenModuleParams,
235 ) -> anyhow::Result<ServerModuleConfig> {
236 let params = self.parse_params(params).unwrap();
237 let (polynomial, sks) = peers.run_dkg_g1().await?;
238
239 let server = LightningConfig {
240 local: LightningConfigLocal {
241 bitcoin_rpc: params.local.bitcoin_rpc.clone(),
242 },
243 consensus: LightningConfigConsensus {
244 tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
245 tpe_pks: peers
246 .num_peers()
247 .peer_ids()
248 .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
249 .collect(),
250 fee_consensus: params.consensus.fee_consensus.clone(),
251 network: params.consensus.network,
252 },
253 private: LightningConfigPrivate {
254 sk: SecretKeyShare(sks),
255 },
256 };
257
258 Ok(server.to_erased())
259 }
260
261 fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
262 let config = config.to_typed::<LightningConfig>()?;
263
264 ensure!(
265 tpe::derive_pk_share(&config.private.sk)
266 == *config
267 .consensus
268 .tpe_pks
269 .get(identity)
270 .context("Public key set has no key for our identity")?,
271 "Preimge encryption secret key share does not match our public key share"
272 );
273
274 Ok(())
275 }
276
277 fn get_client_config(
278 &self,
279 config: &ServerModuleConsensusConfig,
280 ) -> anyhow::Result<LightningClientConfig> {
281 let config = LightningConfigConsensus::from_erased(config)?;
282 Ok(LightningClientConfig {
283 tpe_agg_pk: config.tpe_agg_pk,
284 tpe_pks: config.tpe_pks,
285 fee_consensus: config.fee_consensus,
286 network: config.network,
287 })
288 }
289
290 fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
291 Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
292 }
293}
294
295fn dealer_agg_pk() -> AggregatePublicKey {
296 AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
297}
298
299fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
300 derive_pk_share(&dealer_sk(num_peers, peer))
301}
302
303fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
304 let x = Scalar::from(peer.to_usize() as u64 + 1);
305
306 let y = (0..num_peers.threshold())
310 .map(|index| coefficient(index as u64))
311 .rev()
312 .reduce(|accumulator, c| accumulator * x + c)
313 .expect("We have at least one coefficient");
314
315 SecretKeyShare(y)
316}
317
318fn coefficient(index: u64) -> Scalar {
319 Scalar::random(&mut ChaChaRng::from_seed(
320 *index.consensus_hash::<sha256::Hash>().as_byte_array(),
321 ))
322}
323
324#[derive(Debug)]
325pub struct Lightning {
326 cfg: LightningConfig,
327 block_count_rx: watch::Receiver<Option<u64>>,
329}
330
331#[apply(async_trait_maybe_send!)]
332impl ServerModule for Lightning {
333 type Common = LightningModuleTypes;
334 type Init = LightningInit;
335
336 async fn consensus_proposal(
337 &self,
338 _dbtx: &mut DatabaseTransaction<'_>,
339 ) -> Vec<LightningConsensusItem> {
340 let mut items = vec![LightningConsensusItem::UnixTimeVote(
343 60 * (duration_since_epoch().as_secs() / 60),
344 )];
345
346 if let Ok(block_count) = self.get_block_count() {
347 trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
348 items.push(LightningConsensusItem::BlockCountVote(block_count));
349 }
350
351 items
352 }
353
354 async fn process_consensus_item<'a, 'b>(
355 &'a self,
356 dbtx: &mut DatabaseTransaction<'b>,
357 consensus_item: LightningConsensusItem,
358 peer: PeerId,
359 ) -> anyhow::Result<()> {
360 trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
361 match consensus_item {
362 LightningConsensusItem::BlockCountVote(vote) => {
363 let current_vote = dbtx
364 .insert_entry(&BlockCountVoteKey(peer), &vote)
365 .await
366 .unwrap_or(0);
367
368 ensure!(current_vote < vote, "Block count vote is redundant");
369
370 Ok(())
371 }
372 LightningConsensusItem::UnixTimeVote(vote) => {
373 let current_vote = dbtx
374 .insert_entry(&UnixTimeVoteKey(peer), &vote)
375 .await
376 .unwrap_or(0);
377
378 ensure!(current_vote < vote, "Unix time vote is redundant");
379
380 Ok(())
381 }
382 LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
383 "Received lnv2 consensus item with unknown variant {variant}"
384 )),
385 }
386 }
387
388 async fn process_input<'a, 'b, 'c>(
389 &'a self,
390 dbtx: &mut DatabaseTransaction<'c>,
391 input: &'b LightningInput,
392 _in_point: InPoint,
393 ) -> Result<InputMeta, LightningInputError> {
394 let (pub_key, amount) = match input.ensure_v0_ref()? {
395 LightningInputV0::Outgoing(contract_id, outgoing_witness) => {
396 let contract = dbtx
397 .remove_entry(&OutgoingContractKey(*contract_id))
398 .await
399 .ok_or(LightningInputError::UnknownContract)?;
400
401 let pub_key = match outgoing_witness {
402 OutgoingWitness::Claim(preimage) => {
403 if contract.expiration <= self.consensus_block_count(dbtx).await {
404 return Err(LightningInputError::Expired);
405 }
406
407 if !contract.verify_preimage(preimage) {
408 return Err(LightningInputError::InvalidPreimage);
409 }
410
411 dbtx.insert_entry(&PreimageKey(*contract_id), preimage)
412 .await;
413
414 contract.claim_pk
415 }
416 OutgoingWitness::Refund => {
417 if contract.expiration > self.consensus_block_count(dbtx).await {
418 return Err(LightningInputError::NotExpired);
419 }
420
421 contract.refund_pk
422 }
423 OutgoingWitness::Cancel(forfeit_signature) => {
424 if !contract.verify_forfeit_signature(forfeit_signature) {
425 return Err(LightningInputError::InvalidForfeitSignature);
426 }
427
428 contract.refund_pk
429 }
430 };
431
432 (pub_key, contract.amount)
433 }
434 LightningInputV0::Incoming(contract_id, agg_decryption_key) => {
435 let contract = dbtx
436 .remove_entry(&IncomingContractKey(*contract_id))
437 .await
438 .ok_or(LightningInputError::UnknownContract)?;
439
440 if !contract
441 .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
442 {
443 return Err(LightningInputError::InvalidDecryptionKey);
444 }
445
446 let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
447 Some(..) => contract.commitment.claim_pk,
448 None => contract.commitment.refund_pk,
449 };
450
451 (pub_key, contract.commitment.amount)
452 }
453 };
454
455 Ok(InputMeta {
456 amount: TransactionItemAmount {
457 amount,
458 fee: self.cfg.consensus.fee_consensus.fee(amount),
459 },
460 pub_key,
461 })
462 }
463
464 async fn process_output<'a, 'b>(
465 &'a self,
466 dbtx: &mut DatabaseTransaction<'b>,
467 output: &'a LightningOutput,
468 _outpoint: OutPoint,
469 ) -> Result<TransactionItemAmount, LightningOutputError> {
470 let amount = match output.ensure_v0_ref()? {
471 LightningOutputV0::Outgoing(contract) => {
472 if dbtx
473 .insert_entry(&OutgoingContractKey(contract.contract_id()), contract)
474 .await
475 .is_some()
476 {
477 return Err(LightningOutputError::ContractAlreadyExists);
478 }
479
480 contract.amount
481 }
482 LightningOutputV0::Incoming(contract) => {
483 if !contract.verify() {
484 return Err(LightningOutputError::InvalidContract);
485 }
486
487 if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
488 return Err(LightningOutputError::ContractExpired);
489 }
490
491 if dbtx
492 .insert_entry(&IncomingContractKey(contract.contract_id()), contract)
493 .await
494 .is_some()
495 {
496 return Err(LightningOutputError::ContractAlreadyExists);
497 }
498
499 let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
500
501 dbtx.insert_entry(&DecryptionKeyShareKey(contract.contract_id()), &dk_share)
502 .await;
503
504 contract.commitment.amount
505 }
506 };
507
508 Ok(TransactionItemAmount {
509 amount,
510 fee: self.cfg.consensus.fee_consensus.fee(amount),
511 })
512 }
513
514 async fn output_status(
515 &self,
516 _dbtx: &mut DatabaseTransaction<'_>,
517 _out_point: OutPoint,
518 ) -> Option<LightningOutputOutcome> {
519 None
520 }
521
522 async fn audit(
523 &self,
524 dbtx: &mut DatabaseTransaction<'_>,
525 audit: &mut Audit,
526 module_instance_id: ModuleInstanceId,
527 ) {
528 audit
531 .add_items(
532 dbtx,
533 module_instance_id,
534 &OutgoingContractPrefix,
535 |_, contract| -(contract.amount.msats as i64),
536 )
537 .await;
538
539 audit
540 .add_items(
541 dbtx,
542 module_instance_id,
543 &IncomingContractPrefix,
544 |_, contract| -(contract.commitment.amount.msats as i64),
545 )
546 .await;
547 }
548
549 fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
550 vec![
551 api_endpoint! {
552 CONSENSUS_BLOCK_COUNT_ENDPOINT,
553 ApiVersion::new(0, 0),
554 async |module: &Lightning, context, _params : () | -> u64 {
555 let db = context.db();
556 let mut dbtx = db.begin_transaction_nc().await;
557
558 Ok(module.consensus_block_count(&mut dbtx).await)
559 }
560 },
561 api_endpoint! {
562 AWAIT_INCOMING_CONTRACT_ENDPOINT,
563 ApiVersion::new(0, 0),
564 async |module: &Lightning, context, params: (ContractId, u64) | -> Option<ContractId> {
565 let db = context.db();
566
567 Ok(module.await_incoming_contract(db, params.0, params.1).await)
568 }
569 },
570 api_endpoint! {
571 AWAIT_PREIMAGE_ENDPOINT,
572 ApiVersion::new(0, 0),
573 async |module: &Lightning, context, params: (ContractId, u64)| -> Option<[u8; 32]> {
574 let db = context.db();
575
576 Ok(module.await_preimage(db, params.0, params.1).await)
577 }
578 },
579 api_endpoint! {
580 DECRYPTION_KEY_SHARE_ENDPOINT,
581 ApiVersion::new(0, 0),
582 async |_module: &Lightning, context, params: ContractId| -> DecryptionKeyShare {
583 let share = context
584 .db()
585 .begin_transaction_nc()
586 .await
587 .get_value(&DecryptionKeyShareKey(params))
588 .await
589 .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
590
591 Ok(share)
592 }
593 },
594 api_endpoint! {
595 OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
596 ApiVersion::new(0, 0),
597 async |module: &Lightning, context, contract_id: ContractId| -> Option<u64> {
598 let db = context.db();
599
600 Ok(module.outgoing_contract_expiration(db, contract_id).await)
601 }
602 },
603 api_endpoint! {
604 ADD_GATEWAY_ENDPOINT,
605 ApiVersion::new(0, 0),
606 async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
607 check_auth(context)?;
608
609 let db = context.db();
610
611 Ok(Lightning::add_gateway(db, gateway).await)
612 }
613 },
614 api_endpoint! {
615 REMOVE_GATEWAY_ENDPOINT,
616 ApiVersion::new(0, 0),
617 async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
618 check_auth(context)?;
619
620 let db = context.db();
621
622 Ok(Lightning::remove_gateway(db, gateway).await)
623 }
624 },
625 api_endpoint! {
626 GATEWAYS_ENDPOINT,
627 ApiVersion::new(0, 0),
628 async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
629 let db = context.db();
630
631 Ok(Lightning::gateways(db).await)
632 }
633 },
634 ]
635 }
636}
637
638impl Lightning {
639 async fn new(
640 cfg: LightningConfig,
641 shared_bitcoin: &ServerModuleSharedBitcoin,
642 ) -> anyhow::Result<Self> {
643 let btc_rpc = create_bitcoind(&cfg.local.bitcoin_rpc)?;
644 let block_count_rx = shared_bitcoin
645 .block_count_receiver(cfg.consensus.network, btc_rpc.clone())
646 .await;
647
648 Ok(Lightning {
649 cfg,
650 block_count_rx,
651 })
652 }
653
654 fn get_block_count(&self) -> anyhow::Result<u64> {
655 self.block_count_rx
656 .borrow()
657 .ok_or_else(|| format_err!("Block count not available yet"))
658 }
659
660 async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
661 let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
662
663 let mut counts = dbtx
664 .find_by_prefix(&BlockCountVotePrefix)
665 .await
666 .map(|entry| entry.1)
667 .collect::<Vec<u64>>()
668 .await;
669
670 counts.sort_unstable();
671
672 counts.reverse();
673
674 assert!(counts.last() <= counts.first());
675
676 counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
681 }
682
683 async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
684 let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
685
686 let mut times = dbtx
687 .find_by_prefix(&UnixTimeVotePrefix)
688 .await
689 .map(|entry| entry.1)
690 .collect::<Vec<u64>>()
691 .await;
692
693 times.sort_unstable();
694
695 times.reverse();
696
697 assert!(times.last() <= times.first());
698
699 times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
704 }
705
706 async fn await_incoming_contract(
707 &self,
708 db: Database,
709 contract_id: ContractId,
710 expiration: u64,
711 ) -> Option<ContractId> {
712 loop {
713 timeout(
714 Duration::from_secs(10),
715 db.wait_key_exists(&IncomingContractKey(contract_id)),
716 )
717 .await
718 .ok();
719
720 let mut dbtx = db.begin_transaction_nc().await;
723
724 if let Some(contract) = dbtx.get_value(&IncomingContractKey(contract_id)).await {
725 return Some(contract.contract_id());
726 }
727
728 if expiration <= self.consensus_unix_time(&mut dbtx).await {
729 return None;
730 }
731 }
732 }
733
734 async fn await_preimage(
735 &self,
736 db: Database,
737 contract_id: ContractId,
738 expiration: u64,
739 ) -> Option<[u8; 32]> {
740 loop {
741 timeout(
742 Duration::from_secs(10),
743 db.wait_key_exists(&PreimageKey(contract_id)),
744 )
745 .await
746 .ok();
747
748 let mut dbtx = db.begin_transaction_nc().await;
751
752 if let Some(preimage) = dbtx.get_value(&PreimageKey(contract_id)).await {
753 return Some(preimage);
754 }
755
756 if expiration <= self.consensus_block_count(&mut dbtx).await {
757 return None;
758 }
759 }
760 }
761
762 async fn outgoing_contract_expiration(
763 &self,
764 db: Database,
765 contract_id: ContractId,
766 ) -> Option<u64> {
767 let mut dbtx = db.begin_transaction_nc().await;
768
769 let contract = dbtx.get_value(&OutgoingContractKey(contract_id)).await?;
770
771 let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
772
773 Some(contract.expiration.saturating_sub(consensus_block_count))
774 }
775
776 async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
777 let mut dbtx = db.begin_transaction().await;
778
779 let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
780
781 dbtx.commit_tx().await;
782
783 is_new_entry
784 }
785
786 async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
787 let mut dbtx = db.begin_transaction().await;
788
789 let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
790
791 dbtx.commit_tx().await;
792
793 entry_existed
794 }
795
796 async fn gateways(db: Database) -> Vec<SafeUrl> {
797 db.begin_transaction_nc()
798 .await
799 .find_by_prefix(&GatewayPrefix)
800 .await
801 .map(|entry| entry.0.0)
802 .collect()
803 .await
804 }
805}