fedimint_lnv2_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5mod db;
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::time::Duration;
9
10use anyhow::{Context, anyhow, ensure, format_err};
11use bls12_381::{G1Projective, Scalar};
12use fedimint_bitcoind::create_bitcoind;
13use fedimint_bitcoind::shared::ServerModuleSharedBitcoin;
14use fedimint_core::bitcoin::hashes::sha256;
15use fedimint_core::config::{
16    ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
17    TypedServerModuleConfig, TypedServerModuleConsensusConfig,
18};
19use fedimint_core::core::ModuleInstanceId;
20use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
21use fedimint_core::encoding::Encodable;
22use fedimint_core::module::audit::Audit;
23use fedimint_core::module::{
24    ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion, InputMeta,
25    ModuleConsensusVersion, ModuleInit, PeerHandle, SupportedModuleApiVersions,
26    TransactionItemAmount, api_endpoint,
27};
28use fedimint_core::task::timeout;
29use fedimint_core::time::duration_since_epoch;
30use fedimint_core::util::SafeUrl;
31use fedimint_core::{
32    BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
33    push_db_pair_items,
34};
35use fedimint_lnv2_common::config::{
36    LightningClientConfig, LightningConfig, LightningConfigConsensus, LightningConfigLocal,
37    LightningConfigPrivate, LightningGenParams,
38};
39use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
40use fedimint_lnv2_common::endpoint_constants::{
41    ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_PREIMAGE_ENDPOINT,
42    CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT, GATEWAYS_ENDPOINT,
43    OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
44};
45use fedimint_lnv2_common::{
46    ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
47    LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
48    LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
49};
50use fedimint_logging::LOG_MODULE_LNV2;
51use fedimint_server::config::distributedgen::{PeerHandleOps, eval_poly_g1};
52use fedimint_server::core::{ServerModule, ServerModuleInit, ServerModuleInitArgs};
53use fedimint_server::net::api::check_auth;
54use futures::StreamExt;
55use group::Curve;
56use group::ff::Field;
57use rand::SeedableRng;
58use rand_chacha::ChaChaRng;
59use strum::IntoEnumIterator;
60use tokio::sync::watch;
61use tpe::{
62    AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
63};
64use tracing::trace;
65
66use crate::db::{
67    BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
68    DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractKey,
69    IncomingContractPrefix, OutgoingContractKey, OutgoingContractPrefix, PreimageKey,
70    PreimagePrefix, UnixTimeVoteKey, UnixTimeVotePrefix,
71};
72
73#[derive(Debug, Clone)]
74pub struct LightningInit;
75
76impl ModuleInit for LightningInit {
77    type Common = LightningCommonInit;
78
79    async fn dump_database(
80        &self,
81        dbtx: &mut DatabaseTransaction<'_>,
82        prefix_names: Vec<String>,
83    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
84        let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
85            BTreeMap::new();
86
87        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
88            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
89        });
90
91        for table in filtered_prefixes {
92            match table {
93                DbKeyPrefix::BlockCountVote => {
94                    push_db_pair_items!(
95                        dbtx,
96                        BlockCountVotePrefix,
97                        BlockCountVoteKey,
98                        u64,
99                        lightning,
100                        "Lightning Block Count Votes"
101                    );
102                }
103                DbKeyPrefix::UnixTimeVote => {
104                    push_db_pair_items!(
105                        dbtx,
106                        UnixTimeVotePrefix,
107                        UnixTimeVoteKey,
108                        u64,
109                        lightning,
110                        "Lightning Unix Time Votes"
111                    );
112                }
113                DbKeyPrefix::OutgoingContract => {
114                    push_db_pair_items!(
115                        dbtx,
116                        OutgoingContractPrefix,
117                        LightningOutgoingContractKey,
118                        OutgoingContract,
119                        lightning,
120                        "Lightning Outgoing Contracts"
121                    );
122                }
123                DbKeyPrefix::IncomingContract => {
124                    push_db_pair_items!(
125                        dbtx,
126                        IncomingContractPrefix,
127                        LightningIncomingContractKey,
128                        IncomingContract,
129                        lightning,
130                        "Lightning Incoming Contracts"
131                    );
132                }
133                DbKeyPrefix::DecryptionKeyShare => {
134                    push_db_pair_items!(
135                        dbtx,
136                        DecryptionKeySharePrefix,
137                        DecryptionKeyShareKey,
138                        DecryptionKeyShare,
139                        lightning,
140                        "Lightning Decryption Key Share"
141                    );
142                }
143                DbKeyPrefix::Preimage => {
144                    push_db_pair_items!(
145                        dbtx,
146                        PreimagePrefix,
147                        LightningPreimageKey,
148                        [u8; 32],
149                        lightning,
150                        "Lightning Preimages"
151                    );
152                }
153                DbKeyPrefix::Gateway => {
154                    push_db_pair_items!(
155                        dbtx,
156                        GatewayPrefix,
157                        GatewayKey,
158                        (),
159                        lightning,
160                        "Lightning Gateways"
161                    );
162                }
163            }
164        }
165
166        Box::new(lightning.into_iter())
167    }
168}
169
170#[apply(async_trait_maybe_send!)]
171impl ServerModuleInit for LightningInit {
172    type Module = Lightning;
173    type Params = LightningGenParams;
174
175    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
176        &[MODULE_CONSENSUS_VERSION]
177    }
178
179    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
180        SupportedModuleApiVersions::from_raw(
181            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
182            (
183                MODULE_CONSENSUS_VERSION.major,
184                MODULE_CONSENSUS_VERSION.minor,
185            ),
186            &[(0, 0)],
187        )
188    }
189
190    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
191        Ok(Lightning::new(args.cfg().to_typed()?, &args.shared()).await?)
192    }
193
194    fn trusted_dealer_gen(
195        &self,
196        peers: &[PeerId],
197        params: &ConfigGenModuleParams,
198    ) -> BTreeMap<PeerId, ServerModuleConfig> {
199        let params = self
200            .parse_params(params)
201            .expect("Failed tp parse lnv2 config gen params");
202
203        let tpe_pks = peers
204            .iter()
205            .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
206            .collect::<BTreeMap<PeerId, PublicKeyShare>>();
207
208        peers
209            .iter()
210            .map(|peer| {
211                let cfg = LightningConfig {
212                    local: LightningConfigLocal {
213                        bitcoin_rpc: params.local.bitcoin_rpc.clone(),
214                    },
215                    consensus: LightningConfigConsensus {
216                        tpe_agg_pk: dealer_agg_pk(),
217                        tpe_pks: tpe_pks.clone(),
218                        fee_consensus: params.consensus.fee_consensus.clone(),
219                        network: params.consensus.network,
220                    },
221                    private: LightningConfigPrivate {
222                        sk: dealer_sk(peers.to_num_peers(), *peer),
223                    },
224                };
225
226                (*peer, cfg.to_erased())
227            })
228            .collect()
229    }
230
231    async fn distributed_gen(
232        &self,
233        peers: &PeerHandle,
234        params: &ConfigGenModuleParams,
235    ) -> anyhow::Result<ServerModuleConfig> {
236        let params = self.parse_params(params).unwrap();
237        let (polynomial, sks) = peers.run_dkg_g1().await?;
238
239        let server = LightningConfig {
240            local: LightningConfigLocal {
241                bitcoin_rpc: params.local.bitcoin_rpc.clone(),
242            },
243            consensus: LightningConfigConsensus {
244                tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
245                tpe_pks: peers
246                    .num_peers()
247                    .peer_ids()
248                    .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
249                    .collect(),
250                fee_consensus: params.consensus.fee_consensus.clone(),
251                network: params.consensus.network,
252            },
253            private: LightningConfigPrivate {
254                sk: SecretKeyShare(sks),
255            },
256        };
257
258        Ok(server.to_erased())
259    }
260
261    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
262        let config = config.to_typed::<LightningConfig>()?;
263
264        ensure!(
265            tpe::derive_pk_share(&config.private.sk)
266                == *config
267                    .consensus
268                    .tpe_pks
269                    .get(identity)
270                    .context("Public key set has no key for our identity")?,
271            "Preimge encryption secret key share does not match our public key share"
272        );
273
274        Ok(())
275    }
276
277    fn get_client_config(
278        &self,
279        config: &ServerModuleConsensusConfig,
280    ) -> anyhow::Result<LightningClientConfig> {
281        let config = LightningConfigConsensus::from_erased(config)?;
282        Ok(LightningClientConfig {
283            tpe_agg_pk: config.tpe_agg_pk,
284            tpe_pks: config.tpe_pks,
285            fee_consensus: config.fee_consensus,
286            network: config.network,
287        })
288    }
289
290    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
291        Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
292    }
293}
294
295fn dealer_agg_pk() -> AggregatePublicKey {
296    AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
297}
298
299fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
300    derive_pk_share(&dealer_sk(num_peers, peer))
301}
302
303fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
304    let x = Scalar::from(peer.to_usize() as u64 + 1);
305
306    // We evaluate the scalar polynomial of degree threshold - 1 at the point x
307    // using the Horner schema.
308
309    let y = (0..num_peers.threshold())
310        .map(|index| coefficient(index as u64))
311        .rev()
312        .reduce(|accumulator, c| accumulator * x + c)
313        .expect("We have at least one coefficient");
314
315    SecretKeyShare(y)
316}
317
318fn coefficient(index: u64) -> Scalar {
319    Scalar::random(&mut ChaChaRng::from_seed(
320        *index.consensus_hash::<sha256::Hash>().as_byte_array(),
321    ))
322}
323
324#[derive(Debug)]
325pub struct Lightning {
326    cfg: LightningConfig,
327    /// Block count updated periodically by a background task
328    block_count_rx: watch::Receiver<Option<u64>>,
329}
330
331#[apply(async_trait_maybe_send!)]
332impl ServerModule for Lightning {
333    type Common = LightningModuleTypes;
334    type Init = LightningInit;
335
336    async fn consensus_proposal(
337        &self,
338        _dbtx: &mut DatabaseTransaction<'_>,
339    ) -> Vec<LightningConsensusItem> {
340        // We reduce the time granularity to deduplicate votes more often and not save
341        // one consensus item every second.
342        let mut items = vec![LightningConsensusItem::UnixTimeVote(
343            60 * (duration_since_epoch().as_secs() / 60),
344        )];
345
346        if let Ok(block_count) = self.get_block_count() {
347            trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
348            items.push(LightningConsensusItem::BlockCountVote(block_count));
349        }
350
351        items
352    }
353
354    async fn process_consensus_item<'a, 'b>(
355        &'a self,
356        dbtx: &mut DatabaseTransaction<'b>,
357        consensus_item: LightningConsensusItem,
358        peer: PeerId,
359    ) -> anyhow::Result<()> {
360        trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
361        match consensus_item {
362            LightningConsensusItem::BlockCountVote(vote) => {
363                let current_vote = dbtx
364                    .insert_entry(&BlockCountVoteKey(peer), &vote)
365                    .await
366                    .unwrap_or(0);
367
368                ensure!(current_vote < vote, "Block count vote is redundant");
369
370                Ok(())
371            }
372            LightningConsensusItem::UnixTimeVote(vote) => {
373                let current_vote = dbtx
374                    .insert_entry(&UnixTimeVoteKey(peer), &vote)
375                    .await
376                    .unwrap_or(0);
377
378                ensure!(current_vote < vote, "Unix time vote is redundant");
379
380                Ok(())
381            }
382            LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
383                "Received lnv2 consensus item with unknown variant {variant}"
384            )),
385        }
386    }
387
388    async fn process_input<'a, 'b, 'c>(
389        &'a self,
390        dbtx: &mut DatabaseTransaction<'c>,
391        input: &'b LightningInput,
392        _in_point: InPoint,
393    ) -> Result<InputMeta, LightningInputError> {
394        let (pub_key, amount) = match input.ensure_v0_ref()? {
395            LightningInputV0::Outgoing(contract_id, outgoing_witness) => {
396                let contract = dbtx
397                    .remove_entry(&OutgoingContractKey(*contract_id))
398                    .await
399                    .ok_or(LightningInputError::UnknownContract)?;
400
401                let pub_key = match outgoing_witness {
402                    OutgoingWitness::Claim(preimage) => {
403                        if contract.expiration <= self.consensus_block_count(dbtx).await {
404                            return Err(LightningInputError::Expired);
405                        }
406
407                        if !contract.verify_preimage(preimage) {
408                            return Err(LightningInputError::InvalidPreimage);
409                        }
410
411                        dbtx.insert_entry(&PreimageKey(*contract_id), preimage)
412                            .await;
413
414                        contract.claim_pk
415                    }
416                    OutgoingWitness::Refund => {
417                        if contract.expiration > self.consensus_block_count(dbtx).await {
418                            return Err(LightningInputError::NotExpired);
419                        }
420
421                        contract.refund_pk
422                    }
423                    OutgoingWitness::Cancel(forfeit_signature) => {
424                        if !contract.verify_forfeit_signature(forfeit_signature) {
425                            return Err(LightningInputError::InvalidForfeitSignature);
426                        }
427
428                        contract.refund_pk
429                    }
430                };
431
432                (pub_key, contract.amount)
433            }
434            LightningInputV0::Incoming(contract_id, agg_decryption_key) => {
435                let contract = dbtx
436                    .remove_entry(&IncomingContractKey(*contract_id))
437                    .await
438                    .ok_or(LightningInputError::UnknownContract)?;
439
440                if !contract
441                    .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
442                {
443                    return Err(LightningInputError::InvalidDecryptionKey);
444                }
445
446                let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
447                    Some(..) => contract.commitment.claim_pk,
448                    None => contract.commitment.refund_pk,
449                };
450
451                (pub_key, contract.commitment.amount)
452            }
453        };
454
455        Ok(InputMeta {
456            amount: TransactionItemAmount {
457                amount,
458                fee: self.cfg.consensus.fee_consensus.fee(amount),
459            },
460            pub_key,
461        })
462    }
463
464    async fn process_output<'a, 'b>(
465        &'a self,
466        dbtx: &mut DatabaseTransaction<'b>,
467        output: &'a LightningOutput,
468        _outpoint: OutPoint,
469    ) -> Result<TransactionItemAmount, LightningOutputError> {
470        let amount = match output.ensure_v0_ref()? {
471            LightningOutputV0::Outgoing(contract) => {
472                if dbtx
473                    .insert_entry(&OutgoingContractKey(contract.contract_id()), contract)
474                    .await
475                    .is_some()
476                {
477                    return Err(LightningOutputError::ContractAlreadyExists);
478                }
479
480                contract.amount
481            }
482            LightningOutputV0::Incoming(contract) => {
483                if !contract.verify() {
484                    return Err(LightningOutputError::InvalidContract);
485                }
486
487                if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
488                    return Err(LightningOutputError::ContractExpired);
489                }
490
491                if dbtx
492                    .insert_entry(&IncomingContractKey(contract.contract_id()), contract)
493                    .await
494                    .is_some()
495                {
496                    return Err(LightningOutputError::ContractAlreadyExists);
497                }
498
499                let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
500
501                dbtx.insert_entry(&DecryptionKeyShareKey(contract.contract_id()), &dk_share)
502                    .await;
503
504                contract.commitment.amount
505            }
506        };
507
508        Ok(TransactionItemAmount {
509            amount,
510            fee: self.cfg.consensus.fee_consensus.fee(amount),
511        })
512    }
513
514    async fn output_status(
515        &self,
516        _dbtx: &mut DatabaseTransaction<'_>,
517        _out_point: OutPoint,
518    ) -> Option<LightningOutputOutcome> {
519        None
520    }
521
522    async fn audit(
523        &self,
524        dbtx: &mut DatabaseTransaction<'_>,
525        audit: &mut Audit,
526        module_instance_id: ModuleInstanceId,
527    ) {
528        // Both incoming and outgoing contracts represent liabilities to the federation
529        // since they are obligations to issue notes.
530        audit
531            .add_items(
532                dbtx,
533                module_instance_id,
534                &OutgoingContractPrefix,
535                |_, contract| -(contract.amount.msats as i64),
536            )
537            .await;
538
539        audit
540            .add_items(
541                dbtx,
542                module_instance_id,
543                &IncomingContractPrefix,
544                |_, contract| -(contract.commitment.amount.msats as i64),
545            )
546            .await;
547    }
548
549    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
550        vec![
551            api_endpoint! {
552                CONSENSUS_BLOCK_COUNT_ENDPOINT,
553                ApiVersion::new(0, 0),
554                async |module: &Lightning, context, _params : () | -> u64 {
555                    let db = context.db();
556                    let mut dbtx = db.begin_transaction_nc().await;
557
558                    Ok(module.consensus_block_count(&mut dbtx).await)
559                }
560            },
561            api_endpoint! {
562                AWAIT_INCOMING_CONTRACT_ENDPOINT,
563                ApiVersion::new(0, 0),
564                async |module: &Lightning, context, params: (ContractId, u64) | -> Option<ContractId> {
565                    let db = context.db();
566
567                    Ok(module.await_incoming_contract(db, params.0, params.1).await)
568                }
569            },
570            api_endpoint! {
571                AWAIT_PREIMAGE_ENDPOINT,
572                ApiVersion::new(0, 0),
573                async |module: &Lightning, context, params: (ContractId, u64)| -> Option<[u8; 32]> {
574                    let db = context.db();
575
576                    Ok(module.await_preimage(db, params.0, params.1).await)
577                }
578            },
579            api_endpoint! {
580                DECRYPTION_KEY_SHARE_ENDPOINT,
581                ApiVersion::new(0, 0),
582                async |_module: &Lightning, context, params: ContractId| -> DecryptionKeyShare {
583                    let share = context
584                        .db()
585                        .begin_transaction_nc()
586                        .await
587                        .get_value(&DecryptionKeyShareKey(params))
588                        .await
589                        .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
590
591                    Ok(share)
592                }
593            },
594            api_endpoint! {
595                OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
596                ApiVersion::new(0, 0),
597                async |module: &Lightning, context, contract_id: ContractId| -> Option<u64> {
598                    let db = context.db();
599
600                    Ok(module.outgoing_contract_expiration(db, contract_id).await)
601                }
602            },
603            api_endpoint! {
604                ADD_GATEWAY_ENDPOINT,
605                ApiVersion::new(0, 0),
606                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
607                    check_auth(context)?;
608
609                    let db = context.db();
610
611                    Ok(Lightning::add_gateway(db, gateway).await)
612                }
613            },
614            api_endpoint! {
615                REMOVE_GATEWAY_ENDPOINT,
616                ApiVersion::new(0, 0),
617                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
618                    check_auth(context)?;
619
620                    let db = context.db();
621
622                    Ok(Lightning::remove_gateway(db, gateway).await)
623                }
624            },
625            api_endpoint! {
626                GATEWAYS_ENDPOINT,
627                ApiVersion::new(0, 0),
628                async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
629                    let db = context.db();
630
631                    Ok(Lightning::gateways(db).await)
632                }
633            },
634        ]
635    }
636}
637
638impl Lightning {
639    async fn new(
640        cfg: LightningConfig,
641        shared_bitcoin: &ServerModuleSharedBitcoin,
642    ) -> anyhow::Result<Self> {
643        let btc_rpc = create_bitcoind(&cfg.local.bitcoin_rpc)?;
644        let block_count_rx = shared_bitcoin
645            .block_count_receiver(cfg.consensus.network, btc_rpc.clone())
646            .await;
647
648        Ok(Lightning {
649            cfg,
650            block_count_rx,
651        })
652    }
653
654    fn get_block_count(&self) -> anyhow::Result<u64> {
655        self.block_count_rx
656            .borrow()
657            .ok_or_else(|| format_err!("Block count not available yet"))
658    }
659
660    async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
661        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
662
663        let mut counts = dbtx
664            .find_by_prefix(&BlockCountVotePrefix)
665            .await
666            .map(|entry| entry.1)
667            .collect::<Vec<u64>>()
668            .await;
669
670        counts.sort_unstable();
671
672        counts.reverse();
673
674        assert!(counts.last() <= counts.first());
675
676        // The block count we select guarantees that any threshold of correct peers can
677        // increase the consensus block count and any consensus block count has been
678        // confirmed by a threshold of peers.
679
680        counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
681    }
682
683    async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
684        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
685
686        let mut times = dbtx
687            .find_by_prefix(&UnixTimeVotePrefix)
688            .await
689            .map(|entry| entry.1)
690            .collect::<Vec<u64>>()
691            .await;
692
693        times.sort_unstable();
694
695        times.reverse();
696
697        assert!(times.last() <= times.first());
698
699        // The unix time we select guarantees that any threshold of correct peers can
700        // advance the consensus unix time and any consensus unix time has been
701        // confirmed by a threshold of peers.
702
703        times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
704    }
705
706    async fn await_incoming_contract(
707        &self,
708        db: Database,
709        contract_id: ContractId,
710        expiration: u64,
711    ) -> Option<ContractId> {
712        loop {
713            timeout(
714                Duration::from_secs(10),
715                db.wait_key_exists(&IncomingContractKey(contract_id)),
716            )
717            .await
718            .ok();
719
720            // to avoid race conditions we have to check for the contract and
721            // its expiration in the same database transaction
722            let mut dbtx = db.begin_transaction_nc().await;
723
724            if let Some(contract) = dbtx.get_value(&IncomingContractKey(contract_id)).await {
725                return Some(contract.contract_id());
726            }
727
728            if expiration <= self.consensus_unix_time(&mut dbtx).await {
729                return None;
730            }
731        }
732    }
733
734    async fn await_preimage(
735        &self,
736        db: Database,
737        contract_id: ContractId,
738        expiration: u64,
739    ) -> Option<[u8; 32]> {
740        loop {
741            timeout(
742                Duration::from_secs(10),
743                db.wait_key_exists(&PreimageKey(contract_id)),
744            )
745            .await
746            .ok();
747
748            // to avoid race conditions we have to check for the preimage and
749            // the contracts expiration in the same database transaction
750            let mut dbtx = db.begin_transaction_nc().await;
751
752            if let Some(preimage) = dbtx.get_value(&PreimageKey(contract_id)).await {
753                return Some(preimage);
754            }
755
756            if expiration <= self.consensus_block_count(&mut dbtx).await {
757                return None;
758            }
759        }
760    }
761
762    async fn outgoing_contract_expiration(
763        &self,
764        db: Database,
765        contract_id: ContractId,
766    ) -> Option<u64> {
767        let mut dbtx = db.begin_transaction_nc().await;
768
769        let contract = dbtx.get_value(&OutgoingContractKey(contract_id)).await?;
770
771        let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
772
773        Some(contract.expiration.saturating_sub(consensus_block_count))
774    }
775
776    async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
777        let mut dbtx = db.begin_transaction().await;
778
779        let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
780
781        dbtx.commit_tx().await;
782
783        is_new_entry
784    }
785
786    async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
787        let mut dbtx = db.begin_transaction().await;
788
789        let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
790
791        dbtx.commit_tx().await;
792
793        entry_existed
794    }
795
796    async fn gateways(db: Database) -> Vec<SafeUrl> {
797        db.begin_transaction_nc()
798            .await
799            .find_by_prefix(&GatewayPrefix)
800            .await
801            .map(|entry| entry.0.0)
802            .collect()
803            .await
804    }
805}