fedimint_lnv2_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5mod db;
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::time::Duration;
9
10use anyhow::{Context, anyhow, ensure};
11use bls12_381::{G1Projective, Scalar};
12use fedimint_core::bitcoin::hashes::sha256;
13use fedimint_core::config::{
14    ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
15    TypedServerModuleConfig, TypedServerModuleConsensusConfig,
16};
17use fedimint_core::core::ModuleInstanceId;
18use fedimint_core::db::{Database, DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
19use fedimint_core::encoding::Encodable;
20use fedimint_core::module::audit::Audit;
21use fedimint_core::module::{
22    ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion, InputMeta,
23    ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions, TransactionItemAmount,
24    api_endpoint,
25};
26use fedimint_core::task::timeout;
27use fedimint_core::time::duration_since_epoch;
28use fedimint_core::util::SafeUrl;
29use fedimint_core::{
30    BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
31    push_db_pair_items,
32};
33use fedimint_lnv2_common::config::{
34    FeeConsensus, LightningClientConfig, LightningConfig, LightningConfigConsensus,
35    LightningConfigPrivate, LightningGenParams,
36};
37use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
38use fedimint_lnv2_common::endpoint_constants::{
39    ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_INCOMING_CONTRACTS_ENDPOINT,
40    AWAIT_PREIMAGE_ENDPOINT, CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT,
41    GATEWAYS_ENDPOINT, OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
42};
43use fedimint_lnv2_common::{
44    ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
45    LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
46    LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
47};
48use fedimint_logging::LOG_MODULE_LNV2;
49use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
50use fedimint_server_core::config::{PeerHandleOps, eval_poly_g1};
51use fedimint_server_core::net::check_auth;
52use fedimint_server_core::{ServerModule, ServerModuleInit, ServerModuleInitArgs};
53use futures::StreamExt;
54use group::Curve;
55use group::ff::Field;
56use rand::SeedableRng;
57use rand_chacha::ChaChaRng;
58use strum::IntoEnumIterator;
59use tpe::{
60    AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
61};
62use tracing::trace;
63
64use crate::db::{
65    BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
66    DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractIndexKey,
67    IncomingContractIndexPrefix, IncomingContractKey, IncomingContractOutpointKey,
68    IncomingContractOutpointPrefix, IncomingContractPrefix, IncomingContractStreamIndexKey,
69    IncomingContractStreamKey, IncomingContractStreamPrefix, OutgoingContractKey,
70    OutgoingContractPrefix, PreimageKey, PreimagePrefix, UnixTimeVoteKey, UnixTimeVotePrefix,
71};
72
73#[derive(Debug, Clone)]
74pub struct LightningInit;
75
76impl ModuleInit for LightningInit {
77    type Common = LightningCommonInit;
78
79    #[allow(clippy::too_many_lines)]
80    async fn dump_database(
81        &self,
82        dbtx: &mut DatabaseTransaction<'_>,
83        prefix_names: Vec<String>,
84    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
85        let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
86            BTreeMap::new();
87
88        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
89            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
90        });
91
92        for table in filtered_prefixes {
93            match table {
94                DbKeyPrefix::BlockCountVote => {
95                    push_db_pair_items!(
96                        dbtx,
97                        BlockCountVotePrefix,
98                        BlockCountVoteKey,
99                        u64,
100                        lightning,
101                        "Lightning Block Count Votes"
102                    );
103                }
104                DbKeyPrefix::UnixTimeVote => {
105                    push_db_pair_items!(
106                        dbtx,
107                        UnixTimeVotePrefix,
108                        UnixTimeVoteKey,
109                        u64,
110                        lightning,
111                        "Lightning Unix Time Votes"
112                    );
113                }
114                DbKeyPrefix::OutgoingContract => {
115                    push_db_pair_items!(
116                        dbtx,
117                        OutgoingContractPrefix,
118                        LightningOutgoingContractKey,
119                        OutgoingContract,
120                        lightning,
121                        "Lightning Outgoing Contracts"
122                    );
123                }
124                DbKeyPrefix::IncomingContract => {
125                    push_db_pair_items!(
126                        dbtx,
127                        IncomingContractPrefix,
128                        LightningIncomingContractKey,
129                        IncomingContract,
130                        lightning,
131                        "Lightning Incoming Contracts"
132                    );
133                }
134                DbKeyPrefix::IncomingContractOutpoint => {
135                    push_db_pair_items!(
136                        dbtx,
137                        IncomingContractOutpointPrefix,
138                        LightningIncomingContractOutpointKey,
139                        OutPoint,
140                        lightning,
141                        "Lightning Incoming Contracts Outpoints"
142                    );
143                }
144                DbKeyPrefix::DecryptionKeyShare => {
145                    push_db_pair_items!(
146                        dbtx,
147                        DecryptionKeySharePrefix,
148                        DecryptionKeyShareKey,
149                        DecryptionKeyShare,
150                        lightning,
151                        "Lightning Decryption Key Share"
152                    );
153                }
154                DbKeyPrefix::Preimage => {
155                    push_db_pair_items!(
156                        dbtx,
157                        PreimagePrefix,
158                        LightningPreimageKey,
159                        [u8; 32],
160                        lightning,
161                        "Lightning Preimages"
162                    );
163                }
164                DbKeyPrefix::Gateway => {
165                    push_db_pair_items!(
166                        dbtx,
167                        GatewayPrefix,
168                        GatewayKey,
169                        (),
170                        lightning,
171                        "Lightning Gateways"
172                    );
173                }
174                DbKeyPrefix::IncomingContractStreamIndex => {
175                    push_db_pair_items!(
176                        dbtx,
177                        IncomingContractStreamIndexKey,
178                        IncomingContractStreamIndexKey,
179                        u64,
180                        lightning,
181                        "Lightning Incoming Contract Stream Index"
182                    );
183                }
184                DbKeyPrefix::IncomingContractStream => {
185                    push_db_pair_items!(
186                        dbtx,
187                        IncomingContractStreamPrefix(0),
188                        IncomingContractStreamKey,
189                        IncomingContract,
190                        lightning,
191                        "Lightning Incoming Contract Stream"
192                    );
193                }
194                DbKeyPrefix::IncomingContractIndex => {
195                    push_db_pair_items!(
196                        dbtx,
197                        IncomingContractIndexPrefix,
198                        IncomingContractIndexKey,
199                        u64,
200                        lightning,
201                        "Lightning Incoming Contract Index"
202                    );
203                }
204            }
205        }
206
207        Box::new(lightning.into_iter())
208    }
209}
210
211#[apply(async_trait_maybe_send!)]
212impl ServerModuleInit for LightningInit {
213    type Module = Lightning;
214    type Params = LightningGenParams;
215
216    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
217        &[MODULE_CONSENSUS_VERSION]
218    }
219
220    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
221        SupportedModuleApiVersions::from_raw(
222            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
223            (
224                MODULE_CONSENSUS_VERSION.major,
225                MODULE_CONSENSUS_VERSION.minor,
226            ),
227            &[(0, 0)],
228        )
229    }
230
231    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
232        Ok(Lightning {
233            cfg: args.cfg().to_typed()?,
234            db: args.db().clone(),
235            server_bitcoin_rpc_monitor: args.server_bitcoin_rpc_monitor(),
236        })
237    }
238
239    fn trusted_dealer_gen(
240        &self,
241        peers: &[PeerId],
242        params: &ConfigGenModuleParams,
243        disable_base_fees: bool,
244    ) -> BTreeMap<PeerId, ServerModuleConfig> {
245        let params = self
246            .parse_params(params)
247            .expect("Failed tp parse lnv2 config gen params");
248
249        let tpe_pks = peers
250            .iter()
251            .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
252            .collect::<BTreeMap<PeerId, PublicKeyShare>>();
253
254        peers
255            .iter()
256            .map(|peer| {
257                let cfg = LightningConfig {
258                    consensus: LightningConfigConsensus {
259                        tpe_agg_pk: dealer_agg_pk(),
260                        tpe_pks: tpe_pks.clone(),
261                        fee_consensus: params.consensus.fee_consensus.clone().unwrap_or(
262                            if disable_base_fees {
263                                FeeConsensus::zero()
264                            } else {
265                                FeeConsensus::new(0).expect("Relative fee is within range")
266                            },
267                        ),
268                        network: params.consensus.network,
269                    },
270                    private: LightningConfigPrivate {
271                        sk: dealer_sk(peers.to_num_peers(), *peer),
272                    },
273                };
274
275                (*peer, cfg.to_erased())
276            })
277            .collect()
278    }
279
280    async fn distributed_gen(
281        &self,
282        peers: &(dyn PeerHandleOps + Send + Sync),
283        params: &ConfigGenModuleParams,
284        disable_base_fees: bool,
285    ) -> anyhow::Result<ServerModuleConfig> {
286        let params = self.parse_params(params).unwrap();
287        let (polynomial, sks) = peers.run_dkg_g1().await?;
288
289        let server = LightningConfig {
290            consensus: LightningConfigConsensus {
291                tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
292                tpe_pks: peers
293                    .num_peers()
294                    .peer_ids()
295                    .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
296                    .collect(),
297                fee_consensus: params.consensus.fee_consensus.clone().unwrap_or(
298                    if disable_base_fees {
299                        FeeConsensus::zero()
300                    } else {
301                        FeeConsensus::new(0).expect("Relative fee is within range")
302                    },
303                ),
304                network: params.consensus.network,
305            },
306            private: LightningConfigPrivate {
307                sk: SecretKeyShare(sks),
308            },
309        };
310
311        Ok(server.to_erased())
312    }
313
314    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
315        let config = config.to_typed::<LightningConfig>()?;
316
317        ensure!(
318            tpe::derive_pk_share(&config.private.sk)
319                == *config
320                    .consensus
321                    .tpe_pks
322                    .get(identity)
323                    .context("Public key set has no key for our identity")?,
324            "Preimge encryption secret key share does not match our public key share"
325        );
326
327        Ok(())
328    }
329
330    fn get_client_config(
331        &self,
332        config: &ServerModuleConsensusConfig,
333    ) -> anyhow::Result<LightningClientConfig> {
334        let config = LightningConfigConsensus::from_erased(config)?;
335        Ok(LightningClientConfig {
336            tpe_agg_pk: config.tpe_agg_pk,
337            tpe_pks: config.tpe_pks,
338            fee_consensus: config.fee_consensus,
339            network: config.network,
340        })
341    }
342
343    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
344        Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
345    }
346}
347
348fn dealer_agg_pk() -> AggregatePublicKey {
349    AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
350}
351
352fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
353    derive_pk_share(&dealer_sk(num_peers, peer))
354}
355
356fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
357    let x = Scalar::from(peer.to_usize() as u64 + 1);
358
359    // We evaluate the scalar polynomial of degree threshold - 1 at the point x
360    // using the Horner schema.
361
362    let y = (0..num_peers.threshold())
363        .map(|index| coefficient(index as u64))
364        .rev()
365        .reduce(|accumulator, c| accumulator * x + c)
366        .expect("We have at least one coefficient");
367
368    SecretKeyShare(y)
369}
370
371fn coefficient(index: u64) -> Scalar {
372    Scalar::random(&mut ChaChaRng::from_seed(
373        *index.consensus_hash::<sha256::Hash>().as_byte_array(),
374    ))
375}
376
377#[derive(Debug)]
378pub struct Lightning {
379    cfg: LightningConfig,
380    db: Database,
381    server_bitcoin_rpc_monitor: ServerBitcoinRpcMonitor,
382}
383
384#[apply(async_trait_maybe_send!)]
385impl ServerModule for Lightning {
386    type Common = LightningModuleTypes;
387    type Init = LightningInit;
388
389    async fn consensus_proposal(
390        &self,
391        _dbtx: &mut DatabaseTransaction<'_>,
392    ) -> Vec<LightningConsensusItem> {
393        // We reduce the time granularity to deduplicate votes more often and not save
394        // one consensus item every second.
395        let mut items = vec![LightningConsensusItem::UnixTimeVote(
396            60 * (duration_since_epoch().as_secs() / 60),
397        )];
398
399        if let Ok(block_count) = self.get_block_count() {
400            trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
401            items.push(LightningConsensusItem::BlockCountVote(block_count));
402        }
403
404        items
405    }
406
407    async fn process_consensus_item<'a, 'b>(
408        &'a self,
409        dbtx: &mut DatabaseTransaction<'b>,
410        consensus_item: LightningConsensusItem,
411        peer: PeerId,
412    ) -> anyhow::Result<()> {
413        trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
414
415        match consensus_item {
416            LightningConsensusItem::BlockCountVote(vote) => {
417                let current_vote = dbtx
418                    .insert_entry(&BlockCountVoteKey(peer), &vote)
419                    .await
420                    .unwrap_or(0);
421
422                ensure!(current_vote < vote, "Block count vote is redundant");
423
424                Ok(())
425            }
426            LightningConsensusItem::UnixTimeVote(vote) => {
427                let current_vote = dbtx
428                    .insert_entry(&UnixTimeVoteKey(peer), &vote)
429                    .await
430                    .unwrap_or(0);
431
432                ensure!(current_vote < vote, "Unix time vote is redundant");
433
434                Ok(())
435            }
436            LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
437                "Received lnv2 consensus item with unknown variant {variant}"
438            )),
439        }
440    }
441
442    async fn process_input<'a, 'b, 'c>(
443        &'a self,
444        dbtx: &mut DatabaseTransaction<'c>,
445        input: &'b LightningInput,
446        _in_point: InPoint,
447    ) -> Result<InputMeta, LightningInputError> {
448        let (pub_key, amount) = match input.ensure_v0_ref()? {
449            LightningInputV0::Outgoing(outpoint, outgoing_witness) => {
450                let contract = dbtx
451                    .remove_entry(&OutgoingContractKey(*outpoint))
452                    .await
453                    .ok_or(LightningInputError::UnknownContract)?;
454
455                let pub_key = match outgoing_witness {
456                    OutgoingWitness::Claim(preimage) => {
457                        if contract.expiration <= self.consensus_block_count(dbtx).await {
458                            return Err(LightningInputError::Expired);
459                        }
460
461                        if !contract.verify_preimage(preimage) {
462                            return Err(LightningInputError::InvalidPreimage);
463                        }
464
465                        dbtx.insert_entry(&PreimageKey(*outpoint), preimage).await;
466
467                        contract.claim_pk
468                    }
469                    OutgoingWitness::Refund => {
470                        if contract.expiration > self.consensus_block_count(dbtx).await {
471                            return Err(LightningInputError::NotExpired);
472                        }
473
474                        contract.refund_pk
475                    }
476                    OutgoingWitness::Cancel(forfeit_signature) => {
477                        if !contract.verify_forfeit_signature(forfeit_signature) {
478                            return Err(LightningInputError::InvalidForfeitSignature);
479                        }
480
481                        contract.refund_pk
482                    }
483                };
484
485                (pub_key, contract.amount)
486            }
487            LightningInputV0::Incoming(outpoint, agg_decryption_key) => {
488                let contract = dbtx
489                    .remove_entry(&IncomingContractKey(*outpoint))
490                    .await
491                    .ok_or(LightningInputError::UnknownContract)?;
492
493                if let Some(index) = dbtx
494                    .remove_entry(&IncomingContractIndexKey(*outpoint))
495                    .await
496                {
497                    dbtx.remove_entry(&IncomingContractStreamKey(index)).await;
498                }
499
500                if !contract
501                    .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
502                {
503                    return Err(LightningInputError::InvalidDecryptionKey);
504                }
505
506                let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
507                    Some(..) => contract.commitment.claim_pk,
508                    None => contract.commitment.refund_pk,
509                };
510
511                (pub_key, contract.commitment.amount)
512            }
513        };
514
515        Ok(InputMeta {
516            amount: TransactionItemAmount {
517                amount,
518                fee: self.cfg.consensus.fee_consensus.fee(amount),
519            },
520            pub_key,
521        })
522    }
523
524    async fn process_output<'a, 'b>(
525        &'a self,
526        dbtx: &mut DatabaseTransaction<'b>,
527        output: &'a LightningOutput,
528        outpoint: OutPoint,
529    ) -> Result<TransactionItemAmount, LightningOutputError> {
530        let amount = match output.ensure_v0_ref()? {
531            LightningOutputV0::Outgoing(contract) => {
532                dbtx.insert_new_entry(&OutgoingContractKey(outpoint), contract)
533                    .await;
534
535                contract.amount
536            }
537            LightningOutputV0::Incoming(contract) => {
538                if !contract.verify() {
539                    return Err(LightningOutputError::InvalidContract);
540                }
541
542                if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
543                    return Err(LightningOutputError::ContractExpired);
544                }
545
546                dbtx.insert_new_entry(&IncomingContractKey(outpoint), contract)
547                    .await;
548
549                dbtx.insert_entry(
550                    &IncomingContractOutpointKey(contract.contract_id()),
551                    &outpoint,
552                )
553                .await;
554
555                let stream_index = dbtx
556                    .get_value(&IncomingContractStreamIndexKey)
557                    .await
558                    .unwrap_or(0);
559
560                dbtx.insert_entry(&IncomingContractStreamKey(stream_index), contract)
561                    .await;
562
563                dbtx.insert_entry(&IncomingContractIndexKey(outpoint), &stream_index)
564                    .await;
565
566                dbtx.insert_entry(&IncomingContractStreamIndexKey, &(stream_index + 1))
567                    .await;
568
569                let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
570
571                dbtx.insert_entry(&DecryptionKeyShareKey(outpoint), &dk_share)
572                    .await;
573
574                contract.commitment.amount
575            }
576        };
577
578        Ok(TransactionItemAmount {
579            amount,
580            fee: self.cfg.consensus.fee_consensus.fee(amount),
581        })
582    }
583
584    async fn output_status(
585        &self,
586        _dbtx: &mut DatabaseTransaction<'_>,
587        _out_point: OutPoint,
588    ) -> Option<LightningOutputOutcome> {
589        None
590    }
591
592    async fn audit(
593        &self,
594        dbtx: &mut DatabaseTransaction<'_>,
595        audit: &mut Audit,
596        module_instance_id: ModuleInstanceId,
597    ) {
598        // Both incoming and outgoing contracts represent liabilities to the federation
599        // since they are obligations to issue notes.
600        audit
601            .add_items(
602                dbtx,
603                module_instance_id,
604                &OutgoingContractPrefix,
605                |_, contract| -(contract.amount.msats as i64),
606            )
607            .await;
608
609        audit
610            .add_items(
611                dbtx,
612                module_instance_id,
613                &IncomingContractPrefix,
614                |_, contract| -(contract.commitment.amount.msats as i64),
615            )
616            .await;
617    }
618
619    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
620        vec![
621            api_endpoint! {
622                CONSENSUS_BLOCK_COUNT_ENDPOINT,
623                ApiVersion::new(0, 0),
624                async |module: &Lightning, context, _params : () | -> u64 {
625                    let db = context.db();
626                    let mut dbtx = db.begin_transaction_nc().await;
627
628                    Ok(module.consensus_block_count(&mut dbtx).await)
629                }
630            },
631            api_endpoint! {
632                AWAIT_INCOMING_CONTRACT_ENDPOINT,
633                ApiVersion::new(0, 0),
634                async |module: &Lightning, context, params: (ContractId, u64) | -> Option<OutPoint> {
635                    let db = context.db();
636
637                    Ok(module.await_incoming_contract(db, params.0, params.1).await)
638                }
639            },
640            api_endpoint! {
641                AWAIT_PREIMAGE_ENDPOINT,
642                ApiVersion::new(0, 0),
643                async |module: &Lightning, context, params: (OutPoint, u64)| -> Option<[u8; 32]> {
644                    let db = context.db();
645
646                    Ok(module.await_preimage(db, params.0, params.1).await)
647                }
648            },
649            api_endpoint! {
650                DECRYPTION_KEY_SHARE_ENDPOINT,
651                ApiVersion::new(0, 0),
652                async |_module: &Lightning, context, params: OutPoint| -> DecryptionKeyShare {
653                    let share = context
654                        .db()
655                        .begin_transaction_nc()
656                        .await
657                        .get_value(&DecryptionKeyShareKey(params))
658                        .await
659                        .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
660
661                    Ok(share)
662                }
663            },
664            api_endpoint! {
665                OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
666                ApiVersion::new(0, 0),
667                async |module: &Lightning, context, outpoint: OutPoint| -> Option<(ContractId, u64)> {
668                    let db = context.db();
669
670                    Ok(module.outgoing_contract_expiration(db, outpoint).await)
671                }
672            },
673            api_endpoint! {
674                AWAIT_INCOMING_CONTRACTS_ENDPOINT,
675                ApiVersion::new(0, 0),
676                async |module: &Lightning, context, params: (u64, usize)| -> (Vec<IncomingContract>, u64) {
677                    let db = context.db();
678
679                    if params.1 == 0 {
680                        return Err(ApiError::bad_request("Batch size must be greater than 0".to_string()));
681                    }
682
683                    Ok(module.await_incoming_contracts(db, params.0, params.1).await)
684                }
685            },
686            api_endpoint! {
687                ADD_GATEWAY_ENDPOINT,
688                ApiVersion::new(0, 0),
689                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
690                    check_auth(context)?;
691
692                    let db = context.db();
693
694                    Ok(Lightning::add_gateway(db, gateway).await)
695                }
696            },
697            api_endpoint! {
698                REMOVE_GATEWAY_ENDPOINT,
699                ApiVersion::new(0, 0),
700                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
701                    check_auth(context)?;
702
703                    let db = context.db();
704
705                    Ok(Lightning::remove_gateway(db, gateway).await)
706                }
707            },
708            api_endpoint! {
709                GATEWAYS_ENDPOINT,
710                ApiVersion::new(0, 0),
711                async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
712                    let db = context.db();
713
714                    Ok(Lightning::gateways(db).await)
715                }
716            },
717        ]
718    }
719}
720
721impl Lightning {
722    fn get_block_count(&self) -> anyhow::Result<u64> {
723        self.server_bitcoin_rpc_monitor
724            .status()
725            .map(|status| status.block_count)
726            .context("Block count not available yet")
727    }
728
729    async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
730        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
731
732        let mut counts = dbtx
733            .find_by_prefix(&BlockCountVotePrefix)
734            .await
735            .map(|entry| entry.1)
736            .collect::<Vec<u64>>()
737            .await;
738
739        counts.sort_unstable();
740
741        counts.reverse();
742
743        assert!(counts.last() <= counts.first());
744
745        // The block count we select guarantees that any threshold of correct peers can
746        // increase the consensus block count and any consensus block count has been
747        // confirmed by a threshold of peers.
748
749        counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
750    }
751
752    async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
753        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
754
755        let mut times = dbtx
756            .find_by_prefix(&UnixTimeVotePrefix)
757            .await
758            .map(|entry| entry.1)
759            .collect::<Vec<u64>>()
760            .await;
761
762        times.sort_unstable();
763
764        times.reverse();
765
766        assert!(times.last() <= times.first());
767
768        // The unix time we select guarantees that any threshold of correct peers can
769        // advance the consensus unix time and any consensus unix time has been
770        // confirmed by a threshold of peers.
771
772        times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
773    }
774
775    async fn await_incoming_contract(
776        &self,
777        db: Database,
778        contract_id: ContractId,
779        expiration: u64,
780    ) -> Option<OutPoint> {
781        loop {
782            timeout(
783                Duration::from_secs(10),
784                db.wait_key_exists(&IncomingContractOutpointKey(contract_id)),
785            )
786            .await
787            .ok();
788
789            // to avoid race conditions we have to check for the contract and
790            // its expiration in the same database transaction
791            let mut dbtx = db.begin_transaction_nc().await;
792
793            if let Some(outpoint) = dbtx
794                .get_value(&IncomingContractOutpointKey(contract_id))
795                .await
796            {
797                return Some(outpoint);
798            }
799
800            if expiration <= self.consensus_unix_time(&mut dbtx).await {
801                return None;
802            }
803        }
804    }
805
806    async fn await_preimage(
807        &self,
808        db: Database,
809        outpoint: OutPoint,
810        expiration: u64,
811    ) -> Option<[u8; 32]> {
812        loop {
813            timeout(
814                Duration::from_secs(10),
815                db.wait_key_exists(&PreimageKey(outpoint)),
816            )
817            .await
818            .ok();
819
820            // to avoid race conditions we have to check for the preimage and
821            // the contracts expiration in the same database transaction
822            let mut dbtx = db.begin_transaction_nc().await;
823
824            if let Some(preimage) = dbtx.get_value(&PreimageKey(outpoint)).await {
825                return Some(preimage);
826            }
827
828            if expiration <= self.consensus_block_count(&mut dbtx).await {
829                return None;
830            }
831        }
832    }
833
834    async fn outgoing_contract_expiration(
835        &self,
836        db: Database,
837        outpoint: OutPoint,
838    ) -> Option<(ContractId, u64)> {
839        let mut dbtx = db.begin_transaction_nc().await;
840
841        let contract = dbtx.get_value(&OutgoingContractKey(outpoint)).await?;
842
843        let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
844
845        let expiration = contract.expiration.saturating_sub(consensus_block_count);
846
847        Some((contract.contract_id(), expiration))
848    }
849
850    async fn await_incoming_contracts(
851        &self,
852        db: Database,
853        start: u64,
854        n: usize,
855    ) -> (Vec<IncomingContract>, u64) {
856        let filter = |next_index: Option<u64>| next_index.filter(|i| *i > start);
857
858        let (mut next_index, mut dbtx) = db
859            .wait_key_check(&IncomingContractStreamIndexKey, filter)
860            .await;
861
862        let mut contracts = Vec::with_capacity(n);
863
864        let range = IncomingContractStreamKey(start)..IncomingContractStreamKey(u64::MAX);
865
866        for (key, contract) in dbtx
867            .find_by_range(range)
868            .await
869            .take(n)
870            .collect::<Vec<(IncomingContractStreamKey, IncomingContract)>>()
871            .await
872        {
873            contracts.push(contract.clone());
874            next_index = key.0 + 1;
875        }
876
877        (contracts, next_index)
878    }
879
880    async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
881        let mut dbtx = db.begin_transaction().await;
882
883        let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
884
885        dbtx.commit_tx().await;
886
887        is_new_entry
888    }
889
890    async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
891        let mut dbtx = db.begin_transaction().await;
892
893        let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
894
895        dbtx.commit_tx().await;
896
897        entry_existed
898    }
899
900    async fn gateways(db: Database) -> Vec<SafeUrl> {
901        db.begin_transaction_nc()
902            .await
903            .find_by_prefix(&GatewayPrefix)
904            .await
905            .map(|entry| entry.0.0)
906            .collect()
907            .await
908    }
909
910    pub async fn consensus_block_count_ui(&self) -> u64 {
911        self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
912            .await
913    }
914
915    pub async fn consensus_unix_time_ui(&self) -> u64 {
916        self.consensus_unix_time(&mut self.db.begin_transaction_nc().await)
917            .await
918    }
919
920    pub async fn add_gateway_ui(&self, gateway: SafeUrl) -> bool {
921        Self::add_gateway(self.db.clone(), gateway).await
922    }
923
924    pub async fn remove_gateway_ui(&self, gateway: SafeUrl) -> bool {
925        Self::remove_gateway(self.db.clone(), gateway).await
926    }
927
928    pub async fn gateways_ui(&self) -> Vec<SafeUrl> {
929        Self::gateways(self.db.clone()).await
930    }
931}