fedimint_lnv2_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5mod db;
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::time::Duration;
9
10use anyhow::{Context, anyhow, ensure, format_err};
11use bls12_381::{G1Projective, Scalar};
12use fedimint_bitcoind::create_bitcoind;
13use fedimint_bitcoind::shared::ServerModuleSharedBitcoin;
14use fedimint_core::bitcoin::hashes::sha256;
15use fedimint_core::config::{
16    ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
17    TypedServerModuleConfig, TypedServerModuleConsensusConfig,
18};
19use fedimint_core::core::ModuleInstanceId;
20use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
21use fedimint_core::encoding::Encodable;
22use fedimint_core::module::audit::Audit;
23use fedimint_core::module::{
24    ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion, InputMeta,
25    ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions, TransactionItemAmount,
26    api_endpoint,
27};
28use fedimint_core::task::timeout;
29use fedimint_core::time::duration_since_epoch;
30use fedimint_core::util::SafeUrl;
31use fedimint_core::{
32    BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
33    push_db_pair_items,
34};
35use fedimint_lnv2_common::config::{
36    LightningClientConfig, LightningConfig, LightningConfigConsensus, LightningConfigLocal,
37    LightningConfigPrivate, LightningGenParams,
38};
39use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
40use fedimint_lnv2_common::endpoint_constants::{
41    ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_PREIMAGE_ENDPOINT,
42    CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT, GATEWAYS_ENDPOINT,
43    OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
44};
45use fedimint_lnv2_common::{
46    ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
47    LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
48    LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
49};
50use fedimint_logging::LOG_MODULE_LNV2;
51use fedimint_server_core::config::{PeerHandleOps, eval_poly_g1};
52use fedimint_server_core::net::check_auth;
53use fedimint_server_core::{ServerModule, ServerModuleInit, ServerModuleInitArgs};
54use futures::StreamExt;
55use group::Curve;
56use group::ff::Field;
57use rand::SeedableRng;
58use rand_chacha::ChaChaRng;
59use strum::IntoEnumIterator;
60use tokio::sync::watch;
61use tpe::{
62    AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
63};
64use tracing::trace;
65
66use crate::db::{
67    BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
68    DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractKey,
69    IncomingContractOutpointKey, IncomingContractOutpointPrefix, IncomingContractPrefix,
70    OutgoingContractKey, OutgoingContractPrefix, PreimageKey, PreimagePrefix, UnixTimeVoteKey,
71    UnixTimeVotePrefix,
72};
73
74#[derive(Debug, Clone)]
75pub struct LightningInit;
76
77impl ModuleInit for LightningInit {
78    type Common = LightningCommonInit;
79
80    async fn dump_database(
81        &self,
82        dbtx: &mut DatabaseTransaction<'_>,
83        prefix_names: Vec<String>,
84    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
85        let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
86            BTreeMap::new();
87
88        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
89            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
90        });
91
92        for table in filtered_prefixes {
93            match table {
94                DbKeyPrefix::BlockCountVote => {
95                    push_db_pair_items!(
96                        dbtx,
97                        BlockCountVotePrefix,
98                        BlockCountVoteKey,
99                        u64,
100                        lightning,
101                        "Lightning Block Count Votes"
102                    );
103                }
104                DbKeyPrefix::UnixTimeVote => {
105                    push_db_pair_items!(
106                        dbtx,
107                        UnixTimeVotePrefix,
108                        UnixTimeVoteKey,
109                        u64,
110                        lightning,
111                        "Lightning Unix Time Votes"
112                    );
113                }
114                DbKeyPrefix::OutgoingContract => {
115                    push_db_pair_items!(
116                        dbtx,
117                        OutgoingContractPrefix,
118                        LightningOutgoingContractKey,
119                        OutgoingContract,
120                        lightning,
121                        "Lightning Outgoing Contracts"
122                    );
123                }
124                DbKeyPrefix::IncomingContract => {
125                    push_db_pair_items!(
126                        dbtx,
127                        IncomingContractPrefix,
128                        LightningIncomingContractKey,
129                        IncomingContract,
130                        lightning,
131                        "Lightning Incoming Contracts"
132                    );
133                }
134                DbKeyPrefix::IncomingContractOutpoint => {
135                    push_db_pair_items!(
136                        dbtx,
137                        IncomingContractOutpointPrefix,
138                        LightningIncomingContractOutpointKey,
139                        OutPoint,
140                        lightning,
141                        "Lightning Incoming Contracts Outpoints"
142                    );
143                }
144                DbKeyPrefix::DecryptionKeyShare => {
145                    push_db_pair_items!(
146                        dbtx,
147                        DecryptionKeySharePrefix,
148                        DecryptionKeyShareKey,
149                        DecryptionKeyShare,
150                        lightning,
151                        "Lightning Decryption Key Share"
152                    );
153                }
154                DbKeyPrefix::Preimage => {
155                    push_db_pair_items!(
156                        dbtx,
157                        PreimagePrefix,
158                        LightningPreimageKey,
159                        [u8; 32],
160                        lightning,
161                        "Lightning Preimages"
162                    );
163                }
164                DbKeyPrefix::Gateway => {
165                    push_db_pair_items!(
166                        dbtx,
167                        GatewayPrefix,
168                        GatewayKey,
169                        (),
170                        lightning,
171                        "Lightning Gateways"
172                    );
173                }
174            }
175        }
176
177        Box::new(lightning.into_iter())
178    }
179}
180
181#[apply(async_trait_maybe_send!)]
182impl ServerModuleInit for LightningInit {
183    type Module = Lightning;
184    type Params = LightningGenParams;
185
186    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
187        &[MODULE_CONSENSUS_VERSION]
188    }
189
190    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
191        SupportedModuleApiVersions::from_raw(
192            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
193            (
194                MODULE_CONSENSUS_VERSION.major,
195                MODULE_CONSENSUS_VERSION.minor,
196            ),
197            &[(0, 0)],
198        )
199    }
200
201    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
202        Ok(Lightning::new(args.cfg().to_typed()?, args.db(), &args.shared()).await?)
203    }
204
205    fn trusted_dealer_gen(
206        &self,
207        peers: &[PeerId],
208        params: &ConfigGenModuleParams,
209    ) -> BTreeMap<PeerId, ServerModuleConfig> {
210        let params = self
211            .parse_params(params)
212            .expect("Failed tp parse lnv2 config gen params");
213
214        let tpe_pks = peers
215            .iter()
216            .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
217            .collect::<BTreeMap<PeerId, PublicKeyShare>>();
218
219        peers
220            .iter()
221            .map(|peer| {
222                let cfg = LightningConfig {
223                    local: LightningConfigLocal {
224                        bitcoin_rpc: params.local.bitcoin_rpc.clone(),
225                    },
226                    consensus: LightningConfigConsensus {
227                        tpe_agg_pk: dealer_agg_pk(),
228                        tpe_pks: tpe_pks.clone(),
229                        fee_consensus: params.consensus.fee_consensus.clone(),
230                        network: params.consensus.network,
231                    },
232                    private: LightningConfigPrivate {
233                        sk: dealer_sk(peers.to_num_peers(), *peer),
234                    },
235                };
236
237                (*peer, cfg.to_erased())
238            })
239            .collect()
240    }
241
242    async fn distributed_gen(
243        &self,
244        peers: &(dyn PeerHandleOps + Send + Sync),
245        params: &ConfigGenModuleParams,
246    ) -> anyhow::Result<ServerModuleConfig> {
247        let params = self.parse_params(params).unwrap();
248        let (polynomial, sks) = peers.run_dkg_g1().await?;
249
250        let server = LightningConfig {
251            local: LightningConfigLocal {
252                bitcoin_rpc: params.local.bitcoin_rpc.clone(),
253            },
254            consensus: LightningConfigConsensus {
255                tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
256                tpe_pks: peers
257                    .num_peers()
258                    .peer_ids()
259                    .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
260                    .collect(),
261                fee_consensus: params.consensus.fee_consensus.clone(),
262                network: params.consensus.network,
263            },
264            private: LightningConfigPrivate {
265                sk: SecretKeyShare(sks),
266            },
267        };
268
269        Ok(server.to_erased())
270    }
271
272    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
273        let config = config.to_typed::<LightningConfig>()?;
274
275        ensure!(
276            tpe::derive_pk_share(&config.private.sk)
277                == *config
278                    .consensus
279                    .tpe_pks
280                    .get(identity)
281                    .context("Public key set has no key for our identity")?,
282            "Preimge encryption secret key share does not match our public key share"
283        );
284
285        Ok(())
286    }
287
288    fn get_client_config(
289        &self,
290        config: &ServerModuleConsensusConfig,
291    ) -> anyhow::Result<LightningClientConfig> {
292        let config = LightningConfigConsensus::from_erased(config)?;
293        Ok(LightningClientConfig {
294            tpe_agg_pk: config.tpe_agg_pk,
295            tpe_pks: config.tpe_pks,
296            fee_consensus: config.fee_consensus,
297            network: config.network,
298        })
299    }
300
301    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
302        Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
303    }
304}
305
306fn dealer_agg_pk() -> AggregatePublicKey {
307    AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
308}
309
310fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
311    derive_pk_share(&dealer_sk(num_peers, peer))
312}
313
314fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
315    let x = Scalar::from(peer.to_usize() as u64 + 1);
316
317    // We evaluate the scalar polynomial of degree threshold - 1 at the point x
318    // using the Horner schema.
319
320    let y = (0..num_peers.threshold())
321        .map(|index| coefficient(index as u64))
322        .rev()
323        .reduce(|accumulator, c| accumulator * x + c)
324        .expect("We have at least one coefficient");
325
326    SecretKeyShare(y)
327}
328
329fn coefficient(index: u64) -> Scalar {
330    Scalar::random(&mut ChaChaRng::from_seed(
331        *index.consensus_hash::<sha256::Hash>().as_byte_array(),
332    ))
333}
334
335#[derive(Debug)]
336pub struct Lightning {
337    cfg: LightningConfig,
338    db: Database,
339    /// Block count updated periodically by a background task
340    block_count_rx: watch::Receiver<Option<u64>>,
341}
342
343#[apply(async_trait_maybe_send!)]
344impl ServerModule for Lightning {
345    type Common = LightningModuleTypes;
346    type Init = LightningInit;
347
348    async fn consensus_proposal(
349        &self,
350        _dbtx: &mut DatabaseTransaction<'_>,
351    ) -> Vec<LightningConsensusItem> {
352        // We reduce the time granularity to deduplicate votes more often and not save
353        // one consensus item every second.
354        let mut items = vec![LightningConsensusItem::UnixTimeVote(
355            60 * (duration_since_epoch().as_secs() / 60),
356        )];
357
358        if let Ok(block_count) = self.get_block_count() {
359            trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
360            items.push(LightningConsensusItem::BlockCountVote(block_count));
361        }
362
363        items
364    }
365
366    async fn process_consensus_item<'a, 'b>(
367        &'a self,
368        dbtx: &mut DatabaseTransaction<'b>,
369        consensus_item: LightningConsensusItem,
370        peer: PeerId,
371    ) -> anyhow::Result<()> {
372        trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
373
374        match consensus_item {
375            LightningConsensusItem::BlockCountVote(vote) => {
376                let current_vote = dbtx
377                    .insert_entry(&BlockCountVoteKey(peer), &vote)
378                    .await
379                    .unwrap_or(0);
380
381                ensure!(current_vote < vote, "Block count vote is redundant");
382
383                Ok(())
384            }
385            LightningConsensusItem::UnixTimeVote(vote) => {
386                let current_vote = dbtx
387                    .insert_entry(&UnixTimeVoteKey(peer), &vote)
388                    .await
389                    .unwrap_or(0);
390
391                ensure!(current_vote < vote, "Unix time vote is redundant");
392
393                Ok(())
394            }
395            LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
396                "Received lnv2 consensus item with unknown variant {variant}"
397            )),
398        }
399    }
400
401    async fn process_input<'a, 'b, 'c>(
402        &'a self,
403        dbtx: &mut DatabaseTransaction<'c>,
404        input: &'b LightningInput,
405        _in_point: InPoint,
406    ) -> Result<InputMeta, LightningInputError> {
407        let (pub_key, amount) = match input.ensure_v0_ref()? {
408            LightningInputV0::Outgoing(outpoint, outgoing_witness) => {
409                let contract = dbtx
410                    .remove_entry(&OutgoingContractKey(*outpoint))
411                    .await
412                    .ok_or(LightningInputError::UnknownContract)?;
413
414                let pub_key = match outgoing_witness {
415                    OutgoingWitness::Claim(preimage) => {
416                        if contract.expiration <= self.consensus_block_count(dbtx).await {
417                            return Err(LightningInputError::Expired);
418                        }
419
420                        if !contract.verify_preimage(preimage) {
421                            return Err(LightningInputError::InvalidPreimage);
422                        }
423
424                        dbtx.insert_entry(&PreimageKey(*outpoint), preimage).await;
425
426                        contract.claim_pk
427                    }
428                    OutgoingWitness::Refund => {
429                        if contract.expiration > self.consensus_block_count(dbtx).await {
430                            return Err(LightningInputError::NotExpired);
431                        }
432
433                        contract.refund_pk
434                    }
435                    OutgoingWitness::Cancel(forfeit_signature) => {
436                        if !contract.verify_forfeit_signature(forfeit_signature) {
437                            return Err(LightningInputError::InvalidForfeitSignature);
438                        }
439
440                        contract.refund_pk
441                    }
442                };
443
444                (pub_key, contract.amount)
445            }
446            LightningInputV0::Incoming(outpoint, agg_decryption_key) => {
447                let contract = dbtx
448                    .remove_entry(&IncomingContractKey(*outpoint))
449                    .await
450                    .ok_or(LightningInputError::UnknownContract)?;
451
452                if !contract
453                    .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
454                {
455                    return Err(LightningInputError::InvalidDecryptionKey);
456                }
457
458                let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
459                    Some(..) => contract.commitment.claim_pk,
460                    None => contract.commitment.refund_pk,
461                };
462
463                (pub_key, contract.commitment.amount)
464            }
465        };
466
467        Ok(InputMeta {
468            amount: TransactionItemAmount {
469                amount,
470                fee: self.cfg.consensus.fee_consensus.fee(amount),
471            },
472            pub_key,
473        })
474    }
475
476    async fn process_output<'a, 'b>(
477        &'a self,
478        dbtx: &mut DatabaseTransaction<'b>,
479        output: &'a LightningOutput,
480        outpoint: OutPoint,
481    ) -> Result<TransactionItemAmount, LightningOutputError> {
482        let amount = match output.ensure_v0_ref()? {
483            LightningOutputV0::Outgoing(contract) => {
484                dbtx.insert_new_entry(&OutgoingContractKey(outpoint), contract)
485                    .await;
486
487                contract.amount
488            }
489            LightningOutputV0::Incoming(contract) => {
490                if !contract.verify() {
491                    return Err(LightningOutputError::InvalidContract);
492                }
493
494                if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
495                    return Err(LightningOutputError::ContractExpired);
496                }
497
498                dbtx.insert_new_entry(&IncomingContractKey(outpoint), contract)
499                    .await;
500
501                dbtx.insert_entry(
502                    &IncomingContractOutpointKey(contract.contract_id()),
503                    &outpoint,
504                )
505                .await;
506
507                let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
508
509                dbtx.insert_entry(&DecryptionKeyShareKey(outpoint), &dk_share)
510                    .await;
511
512                contract.commitment.amount
513            }
514        };
515
516        Ok(TransactionItemAmount {
517            amount,
518            fee: self.cfg.consensus.fee_consensus.fee(amount),
519        })
520    }
521
522    async fn output_status(
523        &self,
524        _dbtx: &mut DatabaseTransaction<'_>,
525        _out_point: OutPoint,
526    ) -> Option<LightningOutputOutcome> {
527        None
528    }
529
530    async fn audit(
531        &self,
532        dbtx: &mut DatabaseTransaction<'_>,
533        audit: &mut Audit,
534        module_instance_id: ModuleInstanceId,
535    ) {
536        // Both incoming and outgoing contracts represent liabilities to the federation
537        // since they are obligations to issue notes.
538        audit
539            .add_items(
540                dbtx,
541                module_instance_id,
542                &OutgoingContractPrefix,
543                |_, contract| -(contract.amount.msats as i64),
544            )
545            .await;
546
547        audit
548            .add_items(
549                dbtx,
550                module_instance_id,
551                &IncomingContractPrefix,
552                |_, contract| -(contract.commitment.amount.msats as i64),
553            )
554            .await;
555    }
556
557    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
558        vec![
559            api_endpoint! {
560                CONSENSUS_BLOCK_COUNT_ENDPOINT,
561                ApiVersion::new(0, 0),
562                async |module: &Lightning, context, _params : () | -> u64 {
563                    let db = context.db();
564                    let mut dbtx = db.begin_transaction_nc().await;
565
566                    Ok(module.consensus_block_count(&mut dbtx).await)
567                }
568            },
569            api_endpoint! {
570                AWAIT_INCOMING_CONTRACT_ENDPOINT,
571                ApiVersion::new(0, 0),
572                async |module: &Lightning, context, params: (ContractId, u64) | -> Option<OutPoint> {
573                    let db = context.db();
574
575                    Ok(module.await_incoming_contract(db, params.0, params.1).await)
576                }
577            },
578            api_endpoint! {
579                AWAIT_PREIMAGE_ENDPOINT,
580                ApiVersion::new(0, 0),
581                async |module: &Lightning, context, params: (OutPoint, u64)| -> Option<[u8; 32]> {
582                    let db = context.db();
583
584                    Ok(module.await_preimage(db, params.0, params.1).await)
585                }
586            },
587            api_endpoint! {
588                DECRYPTION_KEY_SHARE_ENDPOINT,
589                ApiVersion::new(0, 0),
590                async |_module: &Lightning, context, params: OutPoint| -> DecryptionKeyShare {
591                    let share = context
592                        .db()
593                        .begin_transaction_nc()
594                        .await
595                        .get_value(&DecryptionKeyShareKey(params))
596                        .await
597                        .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
598
599                    Ok(share)
600                }
601            },
602            api_endpoint! {
603                OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
604                ApiVersion::new(0, 0),
605                async |module: &Lightning, context, outpoint: OutPoint| -> Option<(ContractId, u64)> {
606                    let db = context.db();
607
608                    Ok(module.outgoing_contract_expiration(db, outpoint).await)
609                }
610            },
611            api_endpoint! {
612                ADD_GATEWAY_ENDPOINT,
613                ApiVersion::new(0, 0),
614                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
615                    check_auth(context)?;
616
617                    let db = context.db();
618
619                    Ok(Lightning::add_gateway(db, gateway).await)
620                }
621            },
622            api_endpoint! {
623                REMOVE_GATEWAY_ENDPOINT,
624                ApiVersion::new(0, 0),
625                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
626                    check_auth(context)?;
627
628                    let db = context.db();
629
630                    Ok(Lightning::remove_gateway(db, gateway).await)
631                }
632            },
633            api_endpoint! {
634                GATEWAYS_ENDPOINT,
635                ApiVersion::new(0, 0),
636                async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
637                    let db = context.db();
638
639                    Ok(Lightning::gateways(db).await)
640                }
641            },
642        ]
643    }
644}
645
646impl Lightning {
647    async fn new(
648        cfg: LightningConfig,
649        db: &Database,
650        shared_bitcoin: &ServerModuleSharedBitcoin,
651    ) -> anyhow::Result<Self> {
652        let btc_rpc = create_bitcoind(&cfg.local.bitcoin_rpc)?;
653        let block_count_rx = shared_bitcoin
654            .block_count_receiver(cfg.consensus.network, btc_rpc.clone())
655            .await;
656
657        Ok(Lightning {
658            cfg,
659            db: db.clone(),
660            block_count_rx,
661        })
662    }
663
664    fn get_block_count(&self) -> anyhow::Result<u64> {
665        self.block_count_rx
666            .borrow()
667            .ok_or_else(|| format_err!("Block count not available yet"))
668    }
669
670    async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
671        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
672
673        let mut counts = dbtx
674            .find_by_prefix(&BlockCountVotePrefix)
675            .await
676            .map(|entry| entry.1)
677            .collect::<Vec<u64>>()
678            .await;
679
680        counts.sort_unstable();
681
682        counts.reverse();
683
684        assert!(counts.last() <= counts.first());
685
686        // The block count we select guarantees that any threshold of correct peers can
687        // increase the consensus block count and any consensus block count has been
688        // confirmed by a threshold of peers.
689
690        counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
691    }
692
693    async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
694        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
695
696        let mut times = dbtx
697            .find_by_prefix(&UnixTimeVotePrefix)
698            .await
699            .map(|entry| entry.1)
700            .collect::<Vec<u64>>()
701            .await;
702
703        times.sort_unstable();
704
705        times.reverse();
706
707        assert!(times.last() <= times.first());
708
709        // The unix time we select guarantees that any threshold of correct peers can
710        // advance the consensus unix time and any consensus unix time has been
711        // confirmed by a threshold of peers.
712
713        times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
714    }
715
716    async fn await_incoming_contract(
717        &self,
718        db: Database,
719        contract_id: ContractId,
720        expiration: u64,
721    ) -> Option<OutPoint> {
722        loop {
723            timeout(
724                Duration::from_secs(10),
725                db.wait_key_exists(&IncomingContractOutpointKey(contract_id)),
726            )
727            .await
728            .ok();
729
730            // to avoid race conditions we have to check for the contract and
731            // its expiration in the same database transaction
732            let mut dbtx = db.begin_transaction_nc().await;
733
734            if let Some(outpoint) = dbtx
735                .get_value(&IncomingContractOutpointKey(contract_id))
736                .await
737            {
738                return Some(outpoint);
739            }
740
741            if expiration <= self.consensus_unix_time(&mut dbtx).await {
742                return None;
743            }
744        }
745    }
746
747    async fn await_preimage(
748        &self,
749        db: Database,
750        outpoint: OutPoint,
751        expiration: u64,
752    ) -> Option<[u8; 32]> {
753        loop {
754            timeout(
755                Duration::from_secs(10),
756                db.wait_key_exists(&PreimageKey(outpoint)),
757            )
758            .await
759            .ok();
760
761            // to avoid race conditions we have to check for the preimage and
762            // the contracts expiration in the same database transaction
763            let mut dbtx = db.begin_transaction_nc().await;
764
765            if let Some(preimage) = dbtx.get_value(&PreimageKey(outpoint)).await {
766                return Some(preimage);
767            }
768
769            if expiration <= self.consensus_block_count(&mut dbtx).await {
770                return None;
771            }
772        }
773    }
774
775    async fn outgoing_contract_expiration(
776        &self,
777        db: Database,
778        outpoint: OutPoint,
779    ) -> Option<(ContractId, u64)> {
780        let mut dbtx = db.begin_transaction_nc().await;
781
782        let contract = dbtx.get_value(&OutgoingContractKey(outpoint)).await?;
783
784        let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
785
786        let expiration = contract.expiration.saturating_sub(consensus_block_count);
787
788        Some((contract.contract_id(), expiration))
789    }
790
791    async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
792        let mut dbtx = db.begin_transaction().await;
793
794        let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
795
796        dbtx.commit_tx().await;
797
798        is_new_entry
799    }
800
801    async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
802        let mut dbtx = db.begin_transaction().await;
803
804        let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
805
806        dbtx.commit_tx().await;
807
808        entry_existed
809    }
810
811    async fn gateways(db: Database) -> Vec<SafeUrl> {
812        db.begin_transaction_nc()
813            .await
814            .find_by_prefix(&GatewayPrefix)
815            .await
816            .map(|entry| entry.0.0)
817            .collect()
818            .await
819    }
820
821    pub async fn consensus_block_count_ui(&self) -> u64 {
822        self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
823            .await
824    }
825
826    pub async fn consensus_unix_time_ui(&self) -> u64 {
827        self.consensus_unix_time(&mut self.db.begin_transaction_nc().await)
828            .await
829    }
830
831    pub async fn add_gateway_ui(&self, gateway: SafeUrl) -> bool {
832        Self::add_gateway(self.db.clone(), gateway).await
833    }
834
835    pub async fn remove_gateway_ui(&self, gateway: SafeUrl) -> bool {
836        Self::remove_gateway(self.db.clone(), gateway).await
837    }
838
839    pub async fn gateways_ui(&self) -> Vec<SafeUrl> {
840        Self::gateways(self.db.clone()).await
841    }
842}