fedimint_lnv2_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5pub use fedimint_lnv2_common as common;
6
7mod db;
8
9use std::collections::{BTreeMap, BTreeSet};
10use std::time::Duration;
11
12use anyhow::{Context, anyhow, ensure};
13use bls12_381::{G1Projective, Scalar};
14use fedimint_core::bitcoin::hashes::sha256;
15use fedimint_core::config::{
16    ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
17    TypedServerModuleConsensusConfig,
18};
19use fedimint_core::core::ModuleInstanceId;
20use fedimint_core::db::{
21    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
22};
23use fedimint_core::encoding::Encodable;
24use fedimint_core::module::audit::Audit;
25use fedimint_core::module::{
26    Amounts, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
27    InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
28    TransactionItemAmounts, api_endpoint,
29};
30use fedimint_core::net::auth::check_auth;
31use fedimint_core::task::timeout;
32use fedimint_core::time::duration_since_epoch;
33use fedimint_core::util::SafeUrl;
34use fedimint_core::{
35    BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
36    push_db_pair_items,
37};
38use fedimint_lnv2_common::config::{
39    FeeConsensus, LightningClientConfig, LightningConfig, LightningConfigConsensus,
40    LightningConfigPrivate,
41};
42use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
43use fedimint_lnv2_common::endpoint_constants::{
44    ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_INCOMING_CONTRACTS_ENDPOINT,
45    AWAIT_PREIMAGE_ENDPOINT, CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT,
46    GATEWAYS_ENDPOINT, OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
47};
48use fedimint_lnv2_common::{
49    ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
50    LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
51    LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
52};
53use fedimint_logging::LOG_MODULE_LNV2;
54use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
55use fedimint_server_core::config::{PeerHandleOps, eval_poly_g1};
56use fedimint_server_core::migration::ServerModuleDbMigrationFn;
57use fedimint_server_core::{
58    ConfigGenModuleArgs, ServerModule, ServerModuleInit, ServerModuleInitArgs,
59};
60use futures::StreamExt;
61use group::Curve;
62use group::ff::Field;
63use rand::SeedableRng;
64use rand_chacha::ChaChaRng;
65use strum::IntoEnumIterator;
66use tpe::{
67    AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
68};
69use tracing::trace;
70
71use crate::db::{
72    BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
73    DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractIndexKey,
74    IncomingContractIndexPrefix, IncomingContractKey, IncomingContractOutpointKey,
75    IncomingContractOutpointPrefix, IncomingContractPrefix, IncomingContractStreamIndexKey,
76    IncomingContractStreamKey, IncomingContractStreamPrefix, OutgoingContractKey,
77    OutgoingContractPrefix, PreimageKey, PreimagePrefix, UnixTimeVoteKey, UnixTimeVotePrefix,
78};
79
80#[derive(Debug, Clone)]
81pub struct LightningInit;
82
83impl ModuleInit for LightningInit {
84    type Common = LightningCommonInit;
85
86    #[allow(clippy::too_many_lines)]
87    async fn dump_database(
88        &self,
89        dbtx: &mut DatabaseTransaction<'_>,
90        prefix_names: Vec<String>,
91    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
92        let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
93            BTreeMap::new();
94
95        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
96            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
97        });
98
99        for table in filtered_prefixes {
100            match table {
101                DbKeyPrefix::BlockCountVote => {
102                    push_db_pair_items!(
103                        dbtx,
104                        BlockCountVotePrefix,
105                        BlockCountVoteKey,
106                        u64,
107                        lightning,
108                        "Lightning Block Count Votes"
109                    );
110                }
111                DbKeyPrefix::UnixTimeVote => {
112                    push_db_pair_items!(
113                        dbtx,
114                        UnixTimeVotePrefix,
115                        UnixTimeVoteKey,
116                        u64,
117                        lightning,
118                        "Lightning Unix Time Votes"
119                    );
120                }
121                DbKeyPrefix::OutgoingContract => {
122                    push_db_pair_items!(
123                        dbtx,
124                        OutgoingContractPrefix,
125                        LightningOutgoingContractKey,
126                        OutgoingContract,
127                        lightning,
128                        "Lightning Outgoing Contracts"
129                    );
130                }
131                DbKeyPrefix::IncomingContract => {
132                    push_db_pair_items!(
133                        dbtx,
134                        IncomingContractPrefix,
135                        LightningIncomingContractKey,
136                        IncomingContract,
137                        lightning,
138                        "Lightning Incoming Contracts"
139                    );
140                }
141                DbKeyPrefix::IncomingContractOutpoint => {
142                    push_db_pair_items!(
143                        dbtx,
144                        IncomingContractOutpointPrefix,
145                        LightningIncomingContractOutpointKey,
146                        OutPoint,
147                        lightning,
148                        "Lightning Incoming Contracts Outpoints"
149                    );
150                }
151                DbKeyPrefix::DecryptionKeyShare => {
152                    push_db_pair_items!(
153                        dbtx,
154                        DecryptionKeySharePrefix,
155                        DecryptionKeyShareKey,
156                        DecryptionKeyShare,
157                        lightning,
158                        "Lightning Decryption Key Share"
159                    );
160                }
161                DbKeyPrefix::Preimage => {
162                    push_db_pair_items!(
163                        dbtx,
164                        PreimagePrefix,
165                        LightningPreimageKey,
166                        [u8; 32],
167                        lightning,
168                        "Lightning Preimages"
169                    );
170                }
171                DbKeyPrefix::Gateway => {
172                    push_db_pair_items!(
173                        dbtx,
174                        GatewayPrefix,
175                        GatewayKey,
176                        (),
177                        lightning,
178                        "Lightning Gateways"
179                    );
180                }
181                DbKeyPrefix::IncomingContractStreamIndex => {
182                    push_db_pair_items!(
183                        dbtx,
184                        IncomingContractStreamIndexKey,
185                        IncomingContractStreamIndexKey,
186                        u64,
187                        lightning,
188                        "Lightning Incoming Contract Stream Index"
189                    );
190                }
191                DbKeyPrefix::IncomingContractStream => {
192                    push_db_pair_items!(
193                        dbtx,
194                        IncomingContractStreamPrefix(0),
195                        IncomingContractStreamKey,
196                        IncomingContract,
197                        lightning,
198                        "Lightning Incoming Contract Stream"
199                    );
200                }
201                DbKeyPrefix::IncomingContractIndex => {
202                    push_db_pair_items!(
203                        dbtx,
204                        IncomingContractIndexPrefix,
205                        IncomingContractIndexKey,
206                        u64,
207                        lightning,
208                        "Lightning Incoming Contract Index"
209                    );
210                }
211            }
212        }
213
214        Box::new(lightning.into_iter())
215    }
216}
217
218#[apply(async_trait_maybe_send!)]
219impl ServerModuleInit for LightningInit {
220    type Module = Lightning;
221
222    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
223        &[MODULE_CONSENSUS_VERSION]
224    }
225
226    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
227        SupportedModuleApiVersions::from_raw(
228            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
229            (
230                MODULE_CONSENSUS_VERSION.major,
231                MODULE_CONSENSUS_VERSION.minor,
232            ),
233            &[(0, 0)],
234        )
235    }
236
237    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
238        Ok(Lightning {
239            cfg: args.cfg().to_typed()?,
240            db: args.db().clone(),
241            server_bitcoin_rpc_monitor: args.server_bitcoin_rpc_monitor(),
242        })
243    }
244
245    fn trusted_dealer_gen(
246        &self,
247        peers: &[PeerId],
248        args: &ConfigGenModuleArgs,
249    ) -> BTreeMap<PeerId, ServerModuleConfig> {
250        let tpe_pks = peers
251            .iter()
252            .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
253            .collect::<BTreeMap<PeerId, PublicKeyShare>>();
254
255        peers
256            .iter()
257            .map(|peer| {
258                let cfg = LightningConfig {
259                    consensus: LightningConfigConsensus {
260                        tpe_agg_pk: dealer_agg_pk(),
261                        tpe_pks: tpe_pks.clone(),
262                        fee_consensus: if args.disable_base_fees {
263                            FeeConsensus::zero()
264                        } else {
265                            FeeConsensus::new(0).expect("Relative fee is within range")
266                        },
267                        network: args.network,
268                    },
269                    private: LightningConfigPrivate {
270                        sk: dealer_sk(peers.to_num_peers(), *peer),
271                    },
272                };
273
274                (*peer, cfg.to_erased())
275            })
276            .collect()
277    }
278
279    async fn distributed_gen(
280        &self,
281        peers: &(dyn PeerHandleOps + Send + Sync),
282        args: &ConfigGenModuleArgs,
283    ) -> anyhow::Result<ServerModuleConfig> {
284        let (polynomial, sks) = peers.run_dkg_g1().await?;
285
286        let server = LightningConfig {
287            consensus: LightningConfigConsensus {
288                tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
289                tpe_pks: peers
290                    .num_peers()
291                    .peer_ids()
292                    .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
293                    .collect(),
294                fee_consensus: if args.disable_base_fees {
295                    FeeConsensus::zero()
296                } else {
297                    FeeConsensus::new(0).expect("Relative fee is within range")
298                },
299                network: args.network,
300            },
301            private: LightningConfigPrivate {
302                sk: SecretKeyShare(sks),
303            },
304        };
305
306        Ok(server.to_erased())
307    }
308
309    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
310        let config = config.to_typed::<LightningConfig>()?;
311
312        ensure!(
313            tpe::derive_pk_share(&config.private.sk)
314                == *config
315                    .consensus
316                    .tpe_pks
317                    .get(identity)
318                    .context("Public key set has no key for our identity")?,
319            "Preimge encryption secret key share does not match our public key share"
320        );
321
322        Ok(())
323    }
324
325    fn get_client_config(
326        &self,
327        config: &ServerModuleConsensusConfig,
328    ) -> anyhow::Result<LightningClientConfig> {
329        let config = LightningConfigConsensus::from_erased(config)?;
330        Ok(LightningClientConfig {
331            tpe_agg_pk: config.tpe_agg_pk,
332            tpe_pks: config.tpe_pks,
333            fee_consensus: config.fee_consensus,
334            network: config.network,
335        })
336    }
337
338    fn get_database_migrations(
339        &self,
340    ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Lightning>> {
341        let mut migrations: BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Lightning>> =
342            BTreeMap::new();
343
344        migrations.insert(
345            DatabaseVersion(0),
346            Box::new(move |ctx| Box::pin(crate::db::migrate_to_v1(ctx))),
347        );
348
349        migrations
350    }
351
352    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
353        Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
354    }
355}
356
357fn dealer_agg_pk() -> AggregatePublicKey {
358    AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
359}
360
361fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
362    derive_pk_share(&dealer_sk(num_peers, peer))
363}
364
365fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
366    let x = Scalar::from(peer.to_usize() as u64 + 1);
367
368    // We evaluate the scalar polynomial of degree threshold - 1 at the point x
369    // using the Horner schema.
370
371    let y = (0..num_peers.threshold())
372        .map(|index| coefficient(index as u64))
373        .rev()
374        .reduce(|accumulator, c| accumulator * x + c)
375        .expect("We have at least one coefficient");
376
377    SecretKeyShare(y)
378}
379
380fn coefficient(index: u64) -> Scalar {
381    Scalar::random(&mut ChaChaRng::from_seed(
382        *index.consensus_hash::<sha256::Hash>().as_byte_array(),
383    ))
384}
385
386#[derive(Debug)]
387pub struct Lightning {
388    cfg: LightningConfig,
389    db: Database,
390    server_bitcoin_rpc_monitor: ServerBitcoinRpcMonitor,
391}
392
393#[apply(async_trait_maybe_send!)]
394impl ServerModule for Lightning {
395    type Common = LightningModuleTypes;
396    type Init = LightningInit;
397
398    async fn consensus_proposal(
399        &self,
400        _dbtx: &mut DatabaseTransaction<'_>,
401    ) -> Vec<LightningConsensusItem> {
402        // We reduce the time granularity to deduplicate votes more often and not save
403        // one consensus item every second.
404        let mut items = vec![LightningConsensusItem::UnixTimeVote(
405            60 * (duration_since_epoch().as_secs() / 60),
406        )];
407
408        if let Ok(block_count) = self.get_block_count() {
409            trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
410            items.push(LightningConsensusItem::BlockCountVote(block_count));
411        }
412
413        items
414    }
415
416    async fn process_consensus_item<'a, 'b>(
417        &'a self,
418        dbtx: &mut DatabaseTransaction<'b>,
419        consensus_item: LightningConsensusItem,
420        peer: PeerId,
421    ) -> anyhow::Result<()> {
422        trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
423
424        match consensus_item {
425            LightningConsensusItem::BlockCountVote(vote) => {
426                let current_vote = dbtx
427                    .insert_entry(&BlockCountVoteKey(peer), &vote)
428                    .await
429                    .unwrap_or(0);
430
431                ensure!(current_vote < vote, "Block count vote is redundant");
432
433                Ok(())
434            }
435            LightningConsensusItem::UnixTimeVote(vote) => {
436                let current_vote = dbtx
437                    .insert_entry(&UnixTimeVoteKey(peer), &vote)
438                    .await
439                    .unwrap_or(0);
440
441                ensure!(current_vote < vote, "Unix time vote is redundant");
442
443                Ok(())
444            }
445            LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
446                "Received lnv2 consensus item with unknown variant {variant}"
447            )),
448        }
449    }
450
451    async fn process_input<'a, 'b, 'c>(
452        &'a self,
453        dbtx: &mut DatabaseTransaction<'c>,
454        input: &'b LightningInput,
455        _in_point: InPoint,
456    ) -> Result<InputMeta, LightningInputError> {
457        let (pub_key, amount) = match input.ensure_v0_ref()? {
458            LightningInputV0::Outgoing(outpoint, outgoing_witness) => {
459                let contract = dbtx
460                    .remove_entry(&OutgoingContractKey(*outpoint))
461                    .await
462                    .ok_or(LightningInputError::UnknownContract)?;
463
464                let pub_key = match outgoing_witness {
465                    OutgoingWitness::Claim(preimage) => {
466                        if contract.expiration <= self.consensus_block_count(dbtx).await {
467                            return Err(LightningInputError::Expired);
468                        }
469
470                        if !contract.verify_preimage(preimage) {
471                            return Err(LightningInputError::InvalidPreimage);
472                        }
473
474                        dbtx.insert_entry(&PreimageKey(*outpoint), preimage).await;
475
476                        contract.claim_pk
477                    }
478                    OutgoingWitness::Refund => {
479                        if contract.expiration > self.consensus_block_count(dbtx).await {
480                            return Err(LightningInputError::NotExpired);
481                        }
482
483                        contract.refund_pk
484                    }
485                    OutgoingWitness::Cancel(forfeit_signature) => {
486                        if !contract.verify_forfeit_signature(forfeit_signature) {
487                            return Err(LightningInputError::InvalidForfeitSignature);
488                        }
489
490                        contract.refund_pk
491                    }
492                };
493
494                (pub_key, contract.amount)
495            }
496            LightningInputV0::Incoming(outpoint, agg_decryption_key) => {
497                let contract = dbtx
498                    .remove_entry(&IncomingContractKey(*outpoint))
499                    .await
500                    .ok_or(LightningInputError::UnknownContract)?;
501
502                let index = dbtx
503                    .remove_entry(&IncomingContractIndexKey(*outpoint))
504                    .await
505                    .expect("Incoming contract index should exist");
506
507                dbtx.remove_entry(&IncomingContractStreamKey(index)).await;
508
509                if !contract
510                    .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
511                {
512                    return Err(LightningInputError::InvalidDecryptionKey);
513                }
514
515                let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
516                    Some(..) => contract.commitment.claim_pk,
517                    None => contract.commitment.refund_pk,
518                };
519
520                (pub_key, contract.commitment.amount)
521            }
522        };
523
524        Ok(InputMeta {
525            amount: TransactionItemAmounts {
526                amounts: Amounts::new_bitcoin(amount),
527                fees: Amounts::new_bitcoin(self.cfg.consensus.fee_consensus.fee(amount)),
528            },
529            pub_key,
530        })
531    }
532
533    async fn process_output<'a, 'b>(
534        &'a self,
535        dbtx: &mut DatabaseTransaction<'b>,
536        output: &'a LightningOutput,
537        outpoint: OutPoint,
538    ) -> Result<TransactionItemAmounts, LightningOutputError> {
539        let amount = match output.ensure_v0_ref()? {
540            LightningOutputV0::Outgoing(contract) => {
541                dbtx.insert_new_entry(&OutgoingContractKey(outpoint), contract)
542                    .await;
543
544                contract.amount
545            }
546            LightningOutputV0::Incoming(contract) => {
547                if !contract.verify() {
548                    return Err(LightningOutputError::InvalidContract);
549                }
550
551                if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
552                    return Err(LightningOutputError::ContractExpired);
553                }
554
555                dbtx.insert_new_entry(&IncomingContractKey(outpoint), contract)
556                    .await;
557
558                dbtx.insert_entry(
559                    &IncomingContractOutpointKey(contract.contract_id()),
560                    &outpoint,
561                )
562                .await;
563
564                let stream_index = dbtx
565                    .get_value(&IncomingContractStreamIndexKey)
566                    .await
567                    .unwrap_or(0);
568
569                dbtx.insert_entry(&IncomingContractStreamKey(stream_index), contract)
570                    .await;
571
572                dbtx.insert_entry(&IncomingContractIndexKey(outpoint), &stream_index)
573                    .await;
574
575                dbtx.insert_entry(&IncomingContractStreamIndexKey, &(stream_index + 1))
576                    .await;
577
578                let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
579
580                dbtx.insert_entry(&DecryptionKeyShareKey(outpoint), &dk_share)
581                    .await;
582
583                contract.commitment.amount
584            }
585        };
586
587        Ok(TransactionItemAmounts {
588            amounts: Amounts::new_bitcoin(amount),
589            fees: Amounts::new_bitcoin(self.cfg.consensus.fee_consensus.fee(amount)),
590        })
591    }
592
593    async fn output_status(
594        &self,
595        _dbtx: &mut DatabaseTransaction<'_>,
596        _out_point: OutPoint,
597    ) -> Option<LightningOutputOutcome> {
598        None
599    }
600
601    async fn audit(
602        &self,
603        dbtx: &mut DatabaseTransaction<'_>,
604        audit: &mut Audit,
605        module_instance_id: ModuleInstanceId,
606    ) {
607        // Both incoming and outgoing contracts represent liabilities to the federation
608        // since they are obligations to issue notes.
609        audit
610            .add_items(
611                dbtx,
612                module_instance_id,
613                &OutgoingContractPrefix,
614                |_, contract| -(contract.amount.msats as i64),
615            )
616            .await;
617
618        audit
619            .add_items(
620                dbtx,
621                module_instance_id,
622                &IncomingContractPrefix,
623                |_, contract| -(contract.commitment.amount.msats as i64),
624            )
625            .await;
626    }
627
628    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
629        vec![
630            api_endpoint! {
631                CONSENSUS_BLOCK_COUNT_ENDPOINT,
632                ApiVersion::new(0, 0),
633                async |module: &Lightning, context, _params : () | -> u64 {
634                    let db = context.db();
635                    let mut dbtx = db.begin_transaction_nc().await;
636
637                    Ok(module.consensus_block_count(&mut dbtx).await)
638                }
639            },
640            api_endpoint! {
641                AWAIT_INCOMING_CONTRACT_ENDPOINT,
642                ApiVersion::new(0, 0),
643                async |module: &Lightning, context, params: (ContractId, u64) | -> Option<OutPoint> {
644                    let db = context.db();
645
646                    Ok(module.await_incoming_contract(db, params.0, params.1).await)
647                }
648            },
649            api_endpoint! {
650                AWAIT_PREIMAGE_ENDPOINT,
651                ApiVersion::new(0, 0),
652                async |module: &Lightning, context, params: (OutPoint, u64)| -> Option<[u8; 32]> {
653                    let db = context.db();
654
655                    Ok(module.await_preimage(db, params.0, params.1).await)
656                }
657            },
658            api_endpoint! {
659                DECRYPTION_KEY_SHARE_ENDPOINT,
660                ApiVersion::new(0, 0),
661                async |_module: &Lightning, context, params: OutPoint| -> DecryptionKeyShare {
662                    let share = context
663                        .db()
664                        .begin_transaction_nc()
665                        .await
666                        .get_value(&DecryptionKeyShareKey(params))
667                        .await
668                        .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
669
670                    Ok(share)
671                }
672            },
673            api_endpoint! {
674                OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
675                ApiVersion::new(0, 0),
676                async |module: &Lightning, context, outpoint: OutPoint| -> Option<(ContractId, u64)> {
677                    let db = context.db();
678
679                    Ok(module.outgoing_contract_expiration(db, outpoint).await)
680                }
681            },
682            api_endpoint! {
683                AWAIT_INCOMING_CONTRACTS_ENDPOINT,
684                ApiVersion::new(0, 0),
685                async |module: &Lightning, context, params: (u64, usize)| -> (Vec<IncomingContract>, u64) {
686                    let db = context.db();
687
688                    if params.1 == 0 {
689                        return Err(ApiError::bad_request("Batch size must be greater than 0".to_string()));
690                    }
691
692                    Ok(module.await_incoming_contracts(db, params.0, params.1).await)
693                }
694            },
695            api_endpoint! {
696                ADD_GATEWAY_ENDPOINT,
697                ApiVersion::new(0, 0),
698                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
699                    check_auth(context)?;
700
701                    let db = context.db();
702
703                    Ok(Lightning::add_gateway(db, gateway).await)
704                }
705            },
706            api_endpoint! {
707                REMOVE_GATEWAY_ENDPOINT,
708                ApiVersion::new(0, 0),
709                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
710                    check_auth(context)?;
711
712                    let db = context.db();
713
714                    Ok(Lightning::remove_gateway(db, gateway).await)
715                }
716            },
717            api_endpoint! {
718                GATEWAYS_ENDPOINT,
719                ApiVersion::new(0, 0),
720                async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
721                    let db = context.db();
722
723                    Ok(Lightning::gateways(db).await)
724                }
725            },
726        ]
727    }
728}
729
730impl Lightning {
731    fn get_block_count(&self) -> anyhow::Result<u64> {
732        self.server_bitcoin_rpc_monitor
733            .status()
734            .map(|status| status.block_count)
735            .context("Block count not available yet")
736    }
737
738    async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
739        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
740
741        let mut counts = dbtx
742            .find_by_prefix(&BlockCountVotePrefix)
743            .await
744            .map(|entry| entry.1)
745            .collect::<Vec<u64>>()
746            .await;
747
748        counts.sort_unstable();
749
750        counts.reverse();
751
752        assert!(counts.last() <= counts.first());
753
754        // The block count we select guarantees that any threshold of correct peers can
755        // increase the consensus block count and any consensus block count has been
756        // confirmed by a threshold of peers.
757
758        counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
759    }
760
761    async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
762        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
763
764        let mut times = dbtx
765            .find_by_prefix(&UnixTimeVotePrefix)
766            .await
767            .map(|entry| entry.1)
768            .collect::<Vec<u64>>()
769            .await;
770
771        times.sort_unstable();
772
773        times.reverse();
774
775        assert!(times.last() <= times.first());
776
777        // The unix time we select guarantees that any threshold of correct peers can
778        // advance the consensus unix time and any consensus unix time has been
779        // confirmed by a threshold of peers.
780
781        times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
782    }
783
784    async fn await_incoming_contract(
785        &self,
786        db: Database,
787        contract_id: ContractId,
788        expiration: u64,
789    ) -> Option<OutPoint> {
790        loop {
791            timeout(
792                Duration::from_secs(10),
793                db.wait_key_exists(&IncomingContractOutpointKey(contract_id)),
794            )
795            .await
796            .ok();
797
798            // to avoid race conditions we have to check for the contract and
799            // its expiration in the same database transaction
800            let mut dbtx = db.begin_transaction_nc().await;
801
802            if let Some(outpoint) = dbtx
803                .get_value(&IncomingContractOutpointKey(contract_id))
804                .await
805            {
806                return Some(outpoint);
807            }
808
809            if expiration <= self.consensus_unix_time(&mut dbtx).await {
810                return None;
811            }
812        }
813    }
814
815    async fn await_preimage(
816        &self,
817        db: Database,
818        outpoint: OutPoint,
819        expiration: u64,
820    ) -> Option<[u8; 32]> {
821        loop {
822            timeout(
823                Duration::from_secs(10),
824                db.wait_key_exists(&PreimageKey(outpoint)),
825            )
826            .await
827            .ok();
828
829            // to avoid race conditions we have to check for the preimage and
830            // the contracts expiration in the same database transaction
831            let mut dbtx = db.begin_transaction_nc().await;
832
833            if let Some(preimage) = dbtx.get_value(&PreimageKey(outpoint)).await {
834                return Some(preimage);
835            }
836
837            if expiration <= self.consensus_block_count(&mut dbtx).await {
838                return None;
839            }
840        }
841    }
842
843    async fn outgoing_contract_expiration(
844        &self,
845        db: Database,
846        outpoint: OutPoint,
847    ) -> Option<(ContractId, u64)> {
848        let mut dbtx = db.begin_transaction_nc().await;
849
850        let contract = dbtx.get_value(&OutgoingContractKey(outpoint)).await?;
851
852        let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
853
854        let expiration = contract.expiration.saturating_sub(consensus_block_count);
855
856        Some((contract.contract_id(), expiration))
857    }
858
859    async fn await_incoming_contracts(
860        &self,
861        db: Database,
862        start: u64,
863        n: usize,
864    ) -> (Vec<IncomingContract>, u64) {
865        let filter = |next_index: Option<u64>| next_index.filter(|i| *i > start);
866
867        let (mut next_index, mut dbtx) = db
868            .wait_key_check(&IncomingContractStreamIndexKey, filter)
869            .await;
870
871        let mut contracts = Vec::with_capacity(n);
872
873        let range = IncomingContractStreamKey(start)..IncomingContractStreamKey(u64::MAX);
874
875        for (key, contract) in dbtx
876            .find_by_range(range)
877            .await
878            .take(n)
879            .collect::<Vec<(IncomingContractStreamKey, IncomingContract)>>()
880            .await
881        {
882            contracts.push(contract.clone());
883            next_index = key.0 + 1;
884        }
885
886        (contracts, next_index)
887    }
888
889    async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
890        let mut dbtx = db.begin_transaction().await;
891
892        let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
893
894        dbtx.commit_tx().await;
895
896        is_new_entry
897    }
898
899    async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
900        let mut dbtx = db.begin_transaction().await;
901
902        let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
903
904        dbtx.commit_tx().await;
905
906        entry_existed
907    }
908
909    async fn gateways(db: Database) -> Vec<SafeUrl> {
910        db.begin_transaction_nc()
911            .await
912            .find_by_prefix(&GatewayPrefix)
913            .await
914            .map(|entry| entry.0.0)
915            .collect()
916            .await
917    }
918
919    pub async fn consensus_block_count_ui(&self) -> u64 {
920        self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
921            .await
922    }
923
924    pub async fn consensus_unix_time_ui(&self) -> u64 {
925        self.consensus_unix_time(&mut self.db.begin_transaction_nc().await)
926            .await
927    }
928
929    pub async fn add_gateway_ui(&self, gateway: SafeUrl) -> bool {
930        Self::add_gateway(self.db.clone(), gateway).await
931    }
932
933    pub async fn remove_gateway_ui(&self, gateway: SafeUrl) -> bool {
934        Self::remove_gateway(self.db.clone(), gateway).await
935    }
936
937    pub async fn gateways_ui(&self) -> Vec<SafeUrl> {
938        Self::gateways(self.db.clone()).await
939    }
940}