fedimint_lnv2_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5mod db;
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::time::Duration;
9
10use anyhow::{Context, anyhow, ensure};
11use bls12_381::{G1Projective, Scalar};
12use fedimint_core::bitcoin::hashes::sha256;
13use fedimint_core::config::{
14    ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
15    TypedServerModuleConfig, TypedServerModuleConsensusConfig,
16};
17use fedimint_core::core::ModuleInstanceId;
18use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
19use fedimint_core::encoding::Encodable;
20use fedimint_core::module::audit::Audit;
21use fedimint_core::module::{
22    ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion, InputMeta,
23    ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions, TransactionItemAmount,
24    api_endpoint,
25};
26use fedimint_core::task::timeout;
27use fedimint_core::time::duration_since_epoch;
28use fedimint_core::util::SafeUrl;
29use fedimint_core::{
30    BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
31    push_db_pair_items,
32};
33use fedimint_lnv2_common::config::{
34    LightningClientConfig, LightningConfig, LightningConfigConsensus, LightningConfigPrivate,
35    LightningGenParams,
36};
37use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
38use fedimint_lnv2_common::endpoint_constants::{
39    ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_PREIMAGE_ENDPOINT,
40    CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT, GATEWAYS_ENDPOINT,
41    OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
42};
43use fedimint_lnv2_common::{
44    ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
45    LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
46    LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
47};
48use fedimint_logging::LOG_MODULE_LNV2;
49use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
50use fedimint_server_core::config::{PeerHandleOps, eval_poly_g1};
51use fedimint_server_core::net::check_auth;
52use fedimint_server_core::{ServerModule, ServerModuleInit, ServerModuleInitArgs};
53use futures::StreamExt;
54use group::Curve;
55use group::ff::Field;
56use rand::SeedableRng;
57use rand_chacha::ChaChaRng;
58use strum::IntoEnumIterator;
59use tpe::{
60    AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
61};
62use tracing::trace;
63
64use crate::db::{
65    BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
66    DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractKey,
67    IncomingContractOutpointKey, IncomingContractOutpointPrefix, IncomingContractPrefix,
68    OutgoingContractKey, OutgoingContractPrefix, PreimageKey, PreimagePrefix, UnixTimeVoteKey,
69    UnixTimeVotePrefix,
70};
71
72#[derive(Debug, Clone)]
73pub struct LightningInit;
74
75impl ModuleInit for LightningInit {
76    type Common = LightningCommonInit;
77
78    async fn dump_database(
79        &self,
80        dbtx: &mut DatabaseTransaction<'_>,
81        prefix_names: Vec<String>,
82    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
83        let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
84            BTreeMap::new();
85
86        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
87            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
88        });
89
90        for table in filtered_prefixes {
91            match table {
92                DbKeyPrefix::BlockCountVote => {
93                    push_db_pair_items!(
94                        dbtx,
95                        BlockCountVotePrefix,
96                        BlockCountVoteKey,
97                        u64,
98                        lightning,
99                        "Lightning Block Count Votes"
100                    );
101                }
102                DbKeyPrefix::UnixTimeVote => {
103                    push_db_pair_items!(
104                        dbtx,
105                        UnixTimeVotePrefix,
106                        UnixTimeVoteKey,
107                        u64,
108                        lightning,
109                        "Lightning Unix Time Votes"
110                    );
111                }
112                DbKeyPrefix::OutgoingContract => {
113                    push_db_pair_items!(
114                        dbtx,
115                        OutgoingContractPrefix,
116                        LightningOutgoingContractKey,
117                        OutgoingContract,
118                        lightning,
119                        "Lightning Outgoing Contracts"
120                    );
121                }
122                DbKeyPrefix::IncomingContract => {
123                    push_db_pair_items!(
124                        dbtx,
125                        IncomingContractPrefix,
126                        LightningIncomingContractKey,
127                        IncomingContract,
128                        lightning,
129                        "Lightning Incoming Contracts"
130                    );
131                }
132                DbKeyPrefix::IncomingContractOutpoint => {
133                    push_db_pair_items!(
134                        dbtx,
135                        IncomingContractOutpointPrefix,
136                        LightningIncomingContractOutpointKey,
137                        OutPoint,
138                        lightning,
139                        "Lightning Incoming Contracts Outpoints"
140                    );
141                }
142                DbKeyPrefix::DecryptionKeyShare => {
143                    push_db_pair_items!(
144                        dbtx,
145                        DecryptionKeySharePrefix,
146                        DecryptionKeyShareKey,
147                        DecryptionKeyShare,
148                        lightning,
149                        "Lightning Decryption Key Share"
150                    );
151                }
152                DbKeyPrefix::Preimage => {
153                    push_db_pair_items!(
154                        dbtx,
155                        PreimagePrefix,
156                        LightningPreimageKey,
157                        [u8; 32],
158                        lightning,
159                        "Lightning Preimages"
160                    );
161                }
162                DbKeyPrefix::Gateway => {
163                    push_db_pair_items!(
164                        dbtx,
165                        GatewayPrefix,
166                        GatewayKey,
167                        (),
168                        lightning,
169                        "Lightning Gateways"
170                    );
171                }
172            }
173        }
174
175        Box::new(lightning.into_iter())
176    }
177}
178
179#[apply(async_trait_maybe_send!)]
180impl ServerModuleInit for LightningInit {
181    type Module = Lightning;
182    type Params = LightningGenParams;
183
184    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
185        &[MODULE_CONSENSUS_VERSION]
186    }
187
188    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
189        SupportedModuleApiVersions::from_raw(
190            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
191            (
192                MODULE_CONSENSUS_VERSION.major,
193                MODULE_CONSENSUS_VERSION.minor,
194            ),
195            &[(0, 0)],
196        )
197    }
198
199    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
200        Ok(Lightning {
201            cfg: args.cfg().to_typed()?,
202            db: args.db().clone(),
203            server_bitcoin_rpc_monitor: args.server_bitcoin_rpc_monitor(),
204        })
205    }
206
207    fn trusted_dealer_gen(
208        &self,
209        peers: &[PeerId],
210        params: &ConfigGenModuleParams,
211    ) -> BTreeMap<PeerId, ServerModuleConfig> {
212        let params = self
213            .parse_params(params)
214            .expect("Failed tp parse lnv2 config gen params");
215
216        let tpe_pks = peers
217            .iter()
218            .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
219            .collect::<BTreeMap<PeerId, PublicKeyShare>>();
220
221        peers
222            .iter()
223            .map(|peer| {
224                let cfg = LightningConfig {
225                    consensus: LightningConfigConsensus {
226                        tpe_agg_pk: dealer_agg_pk(),
227                        tpe_pks: tpe_pks.clone(),
228                        fee_consensus: params.consensus.fee_consensus.clone(),
229                        network: params.consensus.network,
230                    },
231                    private: LightningConfigPrivate {
232                        sk: dealer_sk(peers.to_num_peers(), *peer),
233                    },
234                };
235
236                (*peer, cfg.to_erased())
237            })
238            .collect()
239    }
240
241    async fn distributed_gen(
242        &self,
243        peers: &(dyn PeerHandleOps + Send + Sync),
244        params: &ConfigGenModuleParams,
245    ) -> anyhow::Result<ServerModuleConfig> {
246        let params = self.parse_params(params).unwrap();
247        let (polynomial, sks) = peers.run_dkg_g1().await?;
248
249        let server = LightningConfig {
250            consensus: LightningConfigConsensus {
251                tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
252                tpe_pks: peers
253                    .num_peers()
254                    .peer_ids()
255                    .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
256                    .collect(),
257                fee_consensus: params.consensus.fee_consensus.clone(),
258                network: params.consensus.network,
259            },
260            private: LightningConfigPrivate {
261                sk: SecretKeyShare(sks),
262            },
263        };
264
265        Ok(server.to_erased())
266    }
267
268    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
269        let config = config.to_typed::<LightningConfig>()?;
270
271        ensure!(
272            tpe::derive_pk_share(&config.private.sk)
273                == *config
274                    .consensus
275                    .tpe_pks
276                    .get(identity)
277                    .context("Public key set has no key for our identity")?,
278            "Preimge encryption secret key share does not match our public key share"
279        );
280
281        Ok(())
282    }
283
284    fn get_client_config(
285        &self,
286        config: &ServerModuleConsensusConfig,
287    ) -> anyhow::Result<LightningClientConfig> {
288        let config = LightningConfigConsensus::from_erased(config)?;
289        Ok(LightningClientConfig {
290            tpe_agg_pk: config.tpe_agg_pk,
291            tpe_pks: config.tpe_pks,
292            fee_consensus: config.fee_consensus,
293            network: config.network,
294        })
295    }
296
297    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
298        Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
299    }
300}
301
302fn dealer_agg_pk() -> AggregatePublicKey {
303    AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
304}
305
306fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
307    derive_pk_share(&dealer_sk(num_peers, peer))
308}
309
310fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
311    let x = Scalar::from(peer.to_usize() as u64 + 1);
312
313    // We evaluate the scalar polynomial of degree threshold - 1 at the point x
314    // using the Horner schema.
315
316    let y = (0..num_peers.threshold())
317        .map(|index| coefficient(index as u64))
318        .rev()
319        .reduce(|accumulator, c| accumulator * x + c)
320        .expect("We have at least one coefficient");
321
322    SecretKeyShare(y)
323}
324
325fn coefficient(index: u64) -> Scalar {
326    Scalar::random(&mut ChaChaRng::from_seed(
327        *index.consensus_hash::<sha256::Hash>().as_byte_array(),
328    ))
329}
330
331#[derive(Debug)]
332pub struct Lightning {
333    cfg: LightningConfig,
334    db: Database,
335    server_bitcoin_rpc_monitor: ServerBitcoinRpcMonitor,
336}
337
338#[apply(async_trait_maybe_send!)]
339impl ServerModule for Lightning {
340    type Common = LightningModuleTypes;
341    type Init = LightningInit;
342
343    async fn consensus_proposal(
344        &self,
345        _dbtx: &mut DatabaseTransaction<'_>,
346    ) -> Vec<LightningConsensusItem> {
347        // We reduce the time granularity to deduplicate votes more often and not save
348        // one consensus item every second.
349        let mut items = vec![LightningConsensusItem::UnixTimeVote(
350            60 * (duration_since_epoch().as_secs() / 60),
351        )];
352
353        if let Ok(block_count) = self.get_block_count() {
354            trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
355            items.push(LightningConsensusItem::BlockCountVote(block_count));
356        }
357
358        items
359    }
360
361    async fn process_consensus_item<'a, 'b>(
362        &'a self,
363        dbtx: &mut DatabaseTransaction<'b>,
364        consensus_item: LightningConsensusItem,
365        peer: PeerId,
366    ) -> anyhow::Result<()> {
367        trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
368
369        match consensus_item {
370            LightningConsensusItem::BlockCountVote(vote) => {
371                let current_vote = dbtx
372                    .insert_entry(&BlockCountVoteKey(peer), &vote)
373                    .await
374                    .unwrap_or(0);
375
376                ensure!(current_vote < vote, "Block count vote is redundant");
377
378                Ok(())
379            }
380            LightningConsensusItem::UnixTimeVote(vote) => {
381                let current_vote = dbtx
382                    .insert_entry(&UnixTimeVoteKey(peer), &vote)
383                    .await
384                    .unwrap_or(0);
385
386                ensure!(current_vote < vote, "Unix time vote is redundant");
387
388                Ok(())
389            }
390            LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
391                "Received lnv2 consensus item with unknown variant {variant}"
392            )),
393        }
394    }
395
396    async fn process_input<'a, 'b, 'c>(
397        &'a self,
398        dbtx: &mut DatabaseTransaction<'c>,
399        input: &'b LightningInput,
400        _in_point: InPoint,
401    ) -> Result<InputMeta, LightningInputError> {
402        let (pub_key, amount) = match input.ensure_v0_ref()? {
403            LightningInputV0::Outgoing(outpoint, outgoing_witness) => {
404                let contract = dbtx
405                    .remove_entry(&OutgoingContractKey(*outpoint))
406                    .await
407                    .ok_or(LightningInputError::UnknownContract)?;
408
409                let pub_key = match outgoing_witness {
410                    OutgoingWitness::Claim(preimage) => {
411                        if contract.expiration <= self.consensus_block_count(dbtx).await {
412                            return Err(LightningInputError::Expired);
413                        }
414
415                        if !contract.verify_preimage(preimage) {
416                            return Err(LightningInputError::InvalidPreimage);
417                        }
418
419                        dbtx.insert_entry(&PreimageKey(*outpoint), preimage).await;
420
421                        contract.claim_pk
422                    }
423                    OutgoingWitness::Refund => {
424                        if contract.expiration > self.consensus_block_count(dbtx).await {
425                            return Err(LightningInputError::NotExpired);
426                        }
427
428                        contract.refund_pk
429                    }
430                    OutgoingWitness::Cancel(forfeit_signature) => {
431                        if !contract.verify_forfeit_signature(forfeit_signature) {
432                            return Err(LightningInputError::InvalidForfeitSignature);
433                        }
434
435                        contract.refund_pk
436                    }
437                };
438
439                (pub_key, contract.amount)
440            }
441            LightningInputV0::Incoming(outpoint, agg_decryption_key) => {
442                let contract = dbtx
443                    .remove_entry(&IncomingContractKey(*outpoint))
444                    .await
445                    .ok_or(LightningInputError::UnknownContract)?;
446
447                if !contract
448                    .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
449                {
450                    return Err(LightningInputError::InvalidDecryptionKey);
451                }
452
453                let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
454                    Some(..) => contract.commitment.claim_pk,
455                    None => contract.commitment.refund_pk,
456                };
457
458                (pub_key, contract.commitment.amount)
459            }
460        };
461
462        Ok(InputMeta {
463            amount: TransactionItemAmount {
464                amount,
465                fee: self.cfg.consensus.fee_consensus.fee(amount),
466            },
467            pub_key,
468        })
469    }
470
471    async fn process_output<'a, 'b>(
472        &'a self,
473        dbtx: &mut DatabaseTransaction<'b>,
474        output: &'a LightningOutput,
475        outpoint: OutPoint,
476    ) -> Result<TransactionItemAmount, LightningOutputError> {
477        let amount = match output.ensure_v0_ref()? {
478            LightningOutputV0::Outgoing(contract) => {
479                dbtx.insert_new_entry(&OutgoingContractKey(outpoint), contract)
480                    .await;
481
482                contract.amount
483            }
484            LightningOutputV0::Incoming(contract) => {
485                if !contract.verify() {
486                    return Err(LightningOutputError::InvalidContract);
487                }
488
489                if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
490                    return Err(LightningOutputError::ContractExpired);
491                }
492
493                dbtx.insert_new_entry(&IncomingContractKey(outpoint), contract)
494                    .await;
495
496                dbtx.insert_entry(
497                    &IncomingContractOutpointKey(contract.contract_id()),
498                    &outpoint,
499                )
500                .await;
501
502                let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
503
504                dbtx.insert_entry(&DecryptionKeyShareKey(outpoint), &dk_share)
505                    .await;
506
507                contract.commitment.amount
508            }
509        };
510
511        Ok(TransactionItemAmount {
512            amount,
513            fee: self.cfg.consensus.fee_consensus.fee(amount),
514        })
515    }
516
517    async fn output_status(
518        &self,
519        _dbtx: &mut DatabaseTransaction<'_>,
520        _out_point: OutPoint,
521    ) -> Option<LightningOutputOutcome> {
522        None
523    }
524
525    async fn audit(
526        &self,
527        dbtx: &mut DatabaseTransaction<'_>,
528        audit: &mut Audit,
529        module_instance_id: ModuleInstanceId,
530    ) {
531        // Both incoming and outgoing contracts represent liabilities to the federation
532        // since they are obligations to issue notes.
533        audit
534            .add_items(
535                dbtx,
536                module_instance_id,
537                &OutgoingContractPrefix,
538                |_, contract| -(contract.amount.msats as i64),
539            )
540            .await;
541
542        audit
543            .add_items(
544                dbtx,
545                module_instance_id,
546                &IncomingContractPrefix,
547                |_, contract| -(contract.commitment.amount.msats as i64),
548            )
549            .await;
550    }
551
552    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
553        vec![
554            api_endpoint! {
555                CONSENSUS_BLOCK_COUNT_ENDPOINT,
556                ApiVersion::new(0, 0),
557                async |module: &Lightning, context, _params : () | -> u64 {
558                    let db = context.db();
559                    let mut dbtx = db.begin_transaction_nc().await;
560
561                    Ok(module.consensus_block_count(&mut dbtx).await)
562                }
563            },
564            api_endpoint! {
565                AWAIT_INCOMING_CONTRACT_ENDPOINT,
566                ApiVersion::new(0, 0),
567                async |module: &Lightning, context, params: (ContractId, u64) | -> Option<OutPoint> {
568                    let db = context.db();
569
570                    Ok(module.await_incoming_contract(db, params.0, params.1).await)
571                }
572            },
573            api_endpoint! {
574                AWAIT_PREIMAGE_ENDPOINT,
575                ApiVersion::new(0, 0),
576                async |module: &Lightning, context, params: (OutPoint, u64)| -> Option<[u8; 32]> {
577                    let db = context.db();
578
579                    Ok(module.await_preimage(db, params.0, params.1).await)
580                }
581            },
582            api_endpoint! {
583                DECRYPTION_KEY_SHARE_ENDPOINT,
584                ApiVersion::new(0, 0),
585                async |_module: &Lightning, context, params: OutPoint| -> DecryptionKeyShare {
586                    let share = context
587                        .db()
588                        .begin_transaction_nc()
589                        .await
590                        .get_value(&DecryptionKeyShareKey(params))
591                        .await
592                        .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
593
594                    Ok(share)
595                }
596            },
597            api_endpoint! {
598                OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
599                ApiVersion::new(0, 0),
600                async |module: &Lightning, context, outpoint: OutPoint| -> Option<(ContractId, u64)> {
601                    let db = context.db();
602
603                    Ok(module.outgoing_contract_expiration(db, outpoint).await)
604                }
605            },
606            api_endpoint! {
607                ADD_GATEWAY_ENDPOINT,
608                ApiVersion::new(0, 0),
609                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
610                    check_auth(context)?;
611
612                    let db = context.db();
613
614                    Ok(Lightning::add_gateway(db, gateway).await)
615                }
616            },
617            api_endpoint! {
618                REMOVE_GATEWAY_ENDPOINT,
619                ApiVersion::new(0, 0),
620                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
621                    check_auth(context)?;
622
623                    let db = context.db();
624
625                    Ok(Lightning::remove_gateway(db, gateway).await)
626                }
627            },
628            api_endpoint! {
629                GATEWAYS_ENDPOINT,
630                ApiVersion::new(0, 0),
631                async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
632                    let db = context.db();
633
634                    Ok(Lightning::gateways(db).await)
635                }
636            },
637        ]
638    }
639}
640
641impl Lightning {
642    fn get_block_count(&self) -> anyhow::Result<u64> {
643        self.server_bitcoin_rpc_monitor
644            .status()
645            .map(|status| status.block_count)
646            .context("Block count not available yet")
647    }
648
649    async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
650        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
651
652        let mut counts = dbtx
653            .find_by_prefix(&BlockCountVotePrefix)
654            .await
655            .map(|entry| entry.1)
656            .collect::<Vec<u64>>()
657            .await;
658
659        counts.sort_unstable();
660
661        counts.reverse();
662
663        assert!(counts.last() <= counts.first());
664
665        // The block count we select guarantees that any threshold of correct peers can
666        // increase the consensus block count and any consensus block count has been
667        // confirmed by a threshold of peers.
668
669        counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
670    }
671
672    async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
673        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
674
675        let mut times = dbtx
676            .find_by_prefix(&UnixTimeVotePrefix)
677            .await
678            .map(|entry| entry.1)
679            .collect::<Vec<u64>>()
680            .await;
681
682        times.sort_unstable();
683
684        times.reverse();
685
686        assert!(times.last() <= times.first());
687
688        // The unix time we select guarantees that any threshold of correct peers can
689        // advance the consensus unix time and any consensus unix time has been
690        // confirmed by a threshold of peers.
691
692        times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
693    }
694
695    async fn await_incoming_contract(
696        &self,
697        db: Database,
698        contract_id: ContractId,
699        expiration: u64,
700    ) -> Option<OutPoint> {
701        loop {
702            timeout(
703                Duration::from_secs(10),
704                db.wait_key_exists(&IncomingContractOutpointKey(contract_id)),
705            )
706            .await
707            .ok();
708
709            // to avoid race conditions we have to check for the contract and
710            // its expiration in the same database transaction
711            let mut dbtx = db.begin_transaction_nc().await;
712
713            if let Some(outpoint) = dbtx
714                .get_value(&IncomingContractOutpointKey(contract_id))
715                .await
716            {
717                return Some(outpoint);
718            }
719
720            if expiration <= self.consensus_unix_time(&mut dbtx).await {
721                return None;
722            }
723        }
724    }
725
726    async fn await_preimage(
727        &self,
728        db: Database,
729        outpoint: OutPoint,
730        expiration: u64,
731    ) -> Option<[u8; 32]> {
732        loop {
733            timeout(
734                Duration::from_secs(10),
735                db.wait_key_exists(&PreimageKey(outpoint)),
736            )
737            .await
738            .ok();
739
740            // to avoid race conditions we have to check for the preimage and
741            // the contracts expiration in the same database transaction
742            let mut dbtx = db.begin_transaction_nc().await;
743
744            if let Some(preimage) = dbtx.get_value(&PreimageKey(outpoint)).await {
745                return Some(preimage);
746            }
747
748            if expiration <= self.consensus_block_count(&mut dbtx).await {
749                return None;
750            }
751        }
752    }
753
754    async fn outgoing_contract_expiration(
755        &self,
756        db: Database,
757        outpoint: OutPoint,
758    ) -> Option<(ContractId, u64)> {
759        let mut dbtx = db.begin_transaction_nc().await;
760
761        let contract = dbtx.get_value(&OutgoingContractKey(outpoint)).await?;
762
763        let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
764
765        let expiration = contract.expiration.saturating_sub(consensus_block_count);
766
767        Some((contract.contract_id(), expiration))
768    }
769
770    async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
771        let mut dbtx = db.begin_transaction().await;
772
773        let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
774
775        dbtx.commit_tx().await;
776
777        is_new_entry
778    }
779
780    async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
781        let mut dbtx = db.begin_transaction().await;
782
783        let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
784
785        dbtx.commit_tx().await;
786
787        entry_existed
788    }
789
790    async fn gateways(db: Database) -> Vec<SafeUrl> {
791        db.begin_transaction_nc()
792            .await
793            .find_by_prefix(&GatewayPrefix)
794            .await
795            .map(|entry| entry.0.0)
796            .collect()
797            .await
798    }
799
800    pub async fn consensus_block_count_ui(&self) -> u64 {
801        self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
802            .await
803    }
804
805    pub async fn consensus_unix_time_ui(&self) -> u64 {
806        self.consensus_unix_time(&mut self.db.begin_transaction_nc().await)
807            .await
808    }
809
810    pub async fn add_gateway_ui(&self, gateway: SafeUrl) -> bool {
811        Self::add_gateway(self.db.clone(), gateway).await
812    }
813
814    pub async fn remove_gateway_ui(&self, gateway: SafeUrl) -> bool {
815        Self::remove_gateway(self.db.clone(), gateway).await
816    }
817
818    pub async fn gateways_ui(&self) -> Vec<SafeUrl> {
819        Self::gateways(self.db.clone()).await
820    }
821}