Skip to main content

fedimint_lnv2_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5pub use fedimint_lnv2_common as common;
6
7mod db;
8
9use std::collections::{BTreeMap, BTreeSet};
10use std::time::Duration;
11
12use anyhow::{Context, anyhow, ensure};
13use bls12_381::{G1Projective, Scalar};
14use fedimint_core::bitcoin::hashes::sha256;
15use fedimint_core::config::{
16    ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
17    TypedServerModuleConsensusConfig,
18};
19use fedimint_core::core::ModuleInstanceId;
20use fedimint_core::db::{
21    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
22};
23use fedimint_core::encoding::Encodable;
24use fedimint_core::envs::{FM_ENABLE_MODULE_LNV2_ENV, is_env_var_set_opt};
25use fedimint_core::module::audit::Audit;
26use fedimint_core::module::{
27    Amounts, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
28    InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
29    TransactionItemAmounts, api_endpoint,
30};
31use fedimint_core::net::auth::check_auth;
32use fedimint_core::task::timeout;
33use fedimint_core::time::duration_since_epoch;
34use fedimint_core::util::SafeUrl;
35use fedimint_core::{
36    BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
37    push_db_pair_items,
38};
39use fedimint_lnv2_common::config::{
40    FeeConsensus, LightningClientConfig, LightningConfig, LightningConfigConsensus,
41    LightningConfigPrivate,
42};
43use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
44use fedimint_lnv2_common::endpoint_constants::{
45    ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_INCOMING_CONTRACTS_ENDPOINT,
46    AWAIT_PREIMAGE_ENDPOINT, CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT,
47    GATEWAYS_ENDPOINT, OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
48};
49use fedimint_lnv2_common::{
50    ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
51    LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
52    LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
53};
54use fedimint_logging::LOG_MODULE_LNV2;
55use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
56use fedimint_server_core::config::{PeerHandleOps, eval_poly_g1};
57use fedimint_server_core::migration::ServerModuleDbMigrationFn;
58use fedimint_server_core::{
59    ConfigGenModuleArgs, EnvVarDoc, ServerModule, ServerModuleInit, ServerModuleInitArgs,
60};
61use futures::StreamExt;
62use group::Curve;
63use group::ff::Field;
64use rand::SeedableRng;
65use rand_chacha::ChaChaRng;
66use strum::IntoEnumIterator;
67use tpe::{
68    AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
69};
70use tracing::trace;
71
72use crate::db::{
73    BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
74    DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractIndexKey,
75    IncomingContractIndexPrefix, IncomingContractKey, IncomingContractOutpointKey,
76    IncomingContractOutpointPrefix, IncomingContractPrefix, IncomingContractStreamIndexKey,
77    IncomingContractStreamKey, IncomingContractStreamPrefix, OutgoingContractKey,
78    OutgoingContractPrefix, PreimageKey, PreimagePrefix, UnixTimeVoteKey, UnixTimeVotePrefix,
79};
80
81#[derive(Debug, Clone)]
82pub struct LightningInit;
83
84impl ModuleInit for LightningInit {
85    type Common = LightningCommonInit;
86
87    #[allow(clippy::too_many_lines)]
88    async fn dump_database(
89        &self,
90        dbtx: &mut DatabaseTransaction<'_>,
91        prefix_names: Vec<String>,
92    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
93        let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
94            BTreeMap::new();
95
96        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
97            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
98        });
99
100        for table in filtered_prefixes {
101            match table {
102                DbKeyPrefix::BlockCountVote => {
103                    push_db_pair_items!(
104                        dbtx,
105                        BlockCountVotePrefix,
106                        BlockCountVoteKey,
107                        u64,
108                        lightning,
109                        "Lightning Block Count Votes"
110                    );
111                }
112                DbKeyPrefix::UnixTimeVote => {
113                    push_db_pair_items!(
114                        dbtx,
115                        UnixTimeVotePrefix,
116                        UnixTimeVoteKey,
117                        u64,
118                        lightning,
119                        "Lightning Unix Time Votes"
120                    );
121                }
122                DbKeyPrefix::OutgoingContract => {
123                    push_db_pair_items!(
124                        dbtx,
125                        OutgoingContractPrefix,
126                        LightningOutgoingContractKey,
127                        OutgoingContract,
128                        lightning,
129                        "Lightning Outgoing Contracts"
130                    );
131                }
132                DbKeyPrefix::IncomingContract => {
133                    push_db_pair_items!(
134                        dbtx,
135                        IncomingContractPrefix,
136                        LightningIncomingContractKey,
137                        IncomingContract,
138                        lightning,
139                        "Lightning Incoming Contracts"
140                    );
141                }
142                DbKeyPrefix::IncomingContractOutpoint => {
143                    push_db_pair_items!(
144                        dbtx,
145                        IncomingContractOutpointPrefix,
146                        LightningIncomingContractOutpointKey,
147                        OutPoint,
148                        lightning,
149                        "Lightning Incoming Contracts Outpoints"
150                    );
151                }
152                DbKeyPrefix::DecryptionKeyShare => {
153                    push_db_pair_items!(
154                        dbtx,
155                        DecryptionKeySharePrefix,
156                        DecryptionKeyShareKey,
157                        DecryptionKeyShare,
158                        lightning,
159                        "Lightning Decryption Key Share"
160                    );
161                }
162                DbKeyPrefix::Preimage => {
163                    push_db_pair_items!(
164                        dbtx,
165                        PreimagePrefix,
166                        LightningPreimageKey,
167                        [u8; 32],
168                        lightning,
169                        "Lightning Preimages"
170                    );
171                }
172                DbKeyPrefix::Gateway => {
173                    push_db_pair_items!(
174                        dbtx,
175                        GatewayPrefix,
176                        GatewayKey,
177                        (),
178                        lightning,
179                        "Lightning Gateways"
180                    );
181                }
182                DbKeyPrefix::IncomingContractStreamIndex => {
183                    push_db_pair_items!(
184                        dbtx,
185                        IncomingContractStreamIndexKey,
186                        IncomingContractStreamIndexKey,
187                        u64,
188                        lightning,
189                        "Lightning Incoming Contract Stream Index"
190                    );
191                }
192                DbKeyPrefix::IncomingContractStream => {
193                    push_db_pair_items!(
194                        dbtx,
195                        IncomingContractStreamPrefix(0),
196                        IncomingContractStreamKey,
197                        IncomingContract,
198                        lightning,
199                        "Lightning Incoming Contract Stream"
200                    );
201                }
202                DbKeyPrefix::IncomingContractIndex => {
203                    push_db_pair_items!(
204                        dbtx,
205                        IncomingContractIndexPrefix,
206                        IncomingContractIndexKey,
207                        u64,
208                        lightning,
209                        "Lightning Incoming Contract Index"
210                    );
211                }
212            }
213        }
214
215        Box::new(lightning.into_iter())
216    }
217}
218
219#[apply(async_trait_maybe_send!)]
220impl ServerModuleInit for LightningInit {
221    type Module = Lightning;
222
223    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
224        &[MODULE_CONSENSUS_VERSION]
225    }
226
227    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
228        SupportedModuleApiVersions::from_raw(
229            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
230            (
231                MODULE_CONSENSUS_VERSION.major,
232                MODULE_CONSENSUS_VERSION.minor,
233            ),
234            &[(0, 0)],
235        )
236    }
237
238    fn is_enabled_by_default(&self) -> bool {
239        is_env_var_set_opt(FM_ENABLE_MODULE_LNV2_ENV).unwrap_or(true)
240    }
241
242    fn get_documented_env_vars(&self) -> Vec<EnvVarDoc> {
243        vec![EnvVarDoc {
244            name: FM_ENABLE_MODULE_LNV2_ENV,
245            description: "Set to 0/false to disable the LNv2 Lightning module. Enabled by default.",
246        }]
247    }
248
249    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
250        Ok(Lightning {
251            cfg: args.cfg().to_typed()?,
252            db: args.db().clone(),
253            server_bitcoin_rpc_monitor: args.server_bitcoin_rpc_monitor(),
254        })
255    }
256
257    fn trusted_dealer_gen(
258        &self,
259        peers: &[PeerId],
260        args: &ConfigGenModuleArgs,
261    ) -> BTreeMap<PeerId, ServerModuleConfig> {
262        let tpe_pks = peers
263            .iter()
264            .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
265            .collect::<BTreeMap<PeerId, PublicKeyShare>>();
266
267        peers
268            .iter()
269            .map(|peer| {
270                let cfg = LightningConfig {
271                    consensus: LightningConfigConsensus {
272                        tpe_agg_pk: dealer_agg_pk(),
273                        tpe_pks: tpe_pks.clone(),
274                        fee_consensus: if args.disable_base_fees {
275                            FeeConsensus::zero()
276                        } else {
277                            FeeConsensus::new(0).expect("Relative fee is within range")
278                        },
279                        network: args.network,
280                    },
281                    private: LightningConfigPrivate {
282                        sk: dealer_sk(peers.to_num_peers(), *peer),
283                    },
284                };
285
286                (*peer, cfg.to_erased())
287            })
288            .collect()
289    }
290
291    async fn distributed_gen(
292        &self,
293        peers: &(dyn PeerHandleOps + Send + Sync),
294        args: &ConfigGenModuleArgs,
295    ) -> anyhow::Result<ServerModuleConfig> {
296        let (polynomial, sks) = peers.run_dkg_g1().await?;
297
298        let server = LightningConfig {
299            consensus: LightningConfigConsensus {
300                tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
301                tpe_pks: peers
302                    .num_peers()
303                    .peer_ids()
304                    .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
305                    .collect(),
306                fee_consensus: if args.disable_base_fees {
307                    FeeConsensus::zero()
308                } else {
309                    FeeConsensus::new(0).expect("Relative fee is within range")
310                },
311                network: args.network,
312            },
313            private: LightningConfigPrivate {
314                sk: SecretKeyShare(sks),
315            },
316        };
317
318        Ok(server.to_erased())
319    }
320
321    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
322        let config = config.to_typed::<LightningConfig>()?;
323
324        ensure!(
325            tpe::derive_pk_share(&config.private.sk)
326                == *config
327                    .consensus
328                    .tpe_pks
329                    .get(identity)
330                    .context("Public key set has no key for our identity")?,
331            "Preimge encryption secret key share does not match our public key share"
332        );
333
334        Ok(())
335    }
336
337    fn get_client_config(
338        &self,
339        config: &ServerModuleConsensusConfig,
340    ) -> anyhow::Result<LightningClientConfig> {
341        let config = LightningConfigConsensus::from_erased(config)?;
342        Ok(LightningClientConfig {
343            tpe_agg_pk: config.tpe_agg_pk,
344            tpe_pks: config.tpe_pks,
345            fee_consensus: config.fee_consensus,
346            network: config.network,
347        })
348    }
349
350    fn get_database_migrations(
351        &self,
352    ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Lightning>> {
353        let mut migrations: BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Lightning>> =
354            BTreeMap::new();
355
356        migrations.insert(
357            DatabaseVersion(0),
358            Box::new(move |ctx| Box::pin(crate::db::migrate_to_v1(ctx))),
359        );
360
361        migrations
362    }
363
364    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
365        Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
366    }
367}
368
369fn dealer_agg_pk() -> AggregatePublicKey {
370    AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
371}
372
373fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
374    derive_pk_share(&dealer_sk(num_peers, peer))
375}
376
377fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
378    let x = Scalar::from(peer.to_usize() as u64 + 1);
379
380    // We evaluate the scalar polynomial of degree threshold - 1 at the point x
381    // using the Horner schema.
382
383    let y = (0..num_peers.threshold())
384        .map(|index| coefficient(index as u64))
385        .rev()
386        .reduce(|accumulator, c| accumulator * x + c)
387        .expect("We have at least one coefficient");
388
389    SecretKeyShare(y)
390}
391
392fn coefficient(index: u64) -> Scalar {
393    Scalar::random(&mut ChaChaRng::from_seed(
394        *index.consensus_hash::<sha256::Hash>().as_byte_array(),
395    ))
396}
397
398#[derive(Debug)]
399pub struct Lightning {
400    cfg: LightningConfig,
401    db: Database,
402    server_bitcoin_rpc_monitor: ServerBitcoinRpcMonitor,
403}
404
405#[apply(async_trait_maybe_send!)]
406impl ServerModule for Lightning {
407    type Common = LightningModuleTypes;
408    type Init = LightningInit;
409
410    async fn consensus_proposal(
411        &self,
412        _dbtx: &mut DatabaseTransaction<'_>,
413    ) -> Vec<LightningConsensusItem> {
414        // We reduce the time granularity to deduplicate votes more often and not save
415        // one consensus item every second.
416        let mut items = vec![LightningConsensusItem::UnixTimeVote(
417            60 * (duration_since_epoch().as_secs() / 60),
418        )];
419
420        if let Ok(block_count) = self.get_block_count() {
421            trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
422            items.push(LightningConsensusItem::BlockCountVote(block_count));
423        }
424
425        items
426    }
427
428    async fn process_consensus_item<'a, 'b>(
429        &'a self,
430        dbtx: &mut DatabaseTransaction<'b>,
431        consensus_item: LightningConsensusItem,
432        peer: PeerId,
433    ) -> anyhow::Result<()> {
434        trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
435
436        match consensus_item {
437            LightningConsensusItem::BlockCountVote(vote) => {
438                let current_vote = dbtx
439                    .insert_entry(&BlockCountVoteKey(peer), &vote)
440                    .await
441                    .unwrap_or(0);
442
443                ensure!(current_vote < vote, "Block count vote is redundant");
444
445                Ok(())
446            }
447            LightningConsensusItem::UnixTimeVote(vote) => {
448                let current_vote = dbtx
449                    .insert_entry(&UnixTimeVoteKey(peer), &vote)
450                    .await
451                    .unwrap_or(0);
452
453                ensure!(current_vote < vote, "Unix time vote is redundant");
454
455                Ok(())
456            }
457            LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
458                "Received lnv2 consensus item with unknown variant {variant}"
459            )),
460        }
461    }
462
463    async fn process_input<'a, 'b, 'c>(
464        &'a self,
465        dbtx: &mut DatabaseTransaction<'c>,
466        input: &'b LightningInput,
467        _in_point: InPoint,
468    ) -> Result<InputMeta, LightningInputError> {
469        let (pub_key, amount) = match input.ensure_v0_ref()? {
470            LightningInputV0::Outgoing(outpoint, outgoing_witness) => {
471                let contract = dbtx
472                    .remove_entry(&OutgoingContractKey(*outpoint))
473                    .await
474                    .ok_or(LightningInputError::UnknownContract)?;
475
476                let pub_key = match outgoing_witness {
477                    OutgoingWitness::Claim(preimage) => {
478                        if contract.expiration <= self.consensus_block_count(dbtx).await {
479                            return Err(LightningInputError::Expired);
480                        }
481
482                        if !contract.verify_preimage(preimage) {
483                            return Err(LightningInputError::InvalidPreimage);
484                        }
485
486                        dbtx.insert_entry(&PreimageKey(*outpoint), preimage).await;
487
488                        contract.claim_pk
489                    }
490                    OutgoingWitness::Refund => {
491                        if contract.expiration > self.consensus_block_count(dbtx).await {
492                            return Err(LightningInputError::NotExpired);
493                        }
494
495                        contract.refund_pk
496                    }
497                    OutgoingWitness::Cancel(forfeit_signature) => {
498                        if !contract.verify_forfeit_signature(forfeit_signature) {
499                            return Err(LightningInputError::InvalidForfeitSignature);
500                        }
501
502                        contract.refund_pk
503                    }
504                };
505
506                (pub_key, contract.amount)
507            }
508            LightningInputV0::Incoming(outpoint, agg_decryption_key) => {
509                let contract = dbtx
510                    .remove_entry(&IncomingContractKey(*outpoint))
511                    .await
512                    .ok_or(LightningInputError::UnknownContract)?;
513
514                let index = dbtx
515                    .remove_entry(&IncomingContractIndexKey(*outpoint))
516                    .await
517                    .expect("Incoming contract index should exist");
518
519                dbtx.remove_entry(&IncomingContractStreamKey(index)).await;
520
521                if !contract
522                    .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
523                {
524                    return Err(LightningInputError::InvalidDecryptionKey);
525                }
526
527                let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
528                    Some(..) => contract.commitment.claim_pk,
529                    None => contract.commitment.refund_pk,
530                };
531
532                (pub_key, contract.commitment.amount)
533            }
534        };
535
536        Ok(InputMeta {
537            amount: TransactionItemAmounts {
538                amounts: Amounts::new_bitcoin(amount),
539                fees: Amounts::new_bitcoin(self.cfg.consensus.fee_consensus.fee(amount)),
540            },
541            pub_key,
542        })
543    }
544
545    async fn process_output<'a, 'b>(
546        &'a self,
547        dbtx: &mut DatabaseTransaction<'b>,
548        output: &'a LightningOutput,
549        outpoint: OutPoint,
550    ) -> Result<TransactionItemAmounts, LightningOutputError> {
551        let amount = match output.ensure_v0_ref()? {
552            LightningOutputV0::Outgoing(contract) => {
553                dbtx.insert_new_entry(&OutgoingContractKey(outpoint), contract)
554                    .await;
555
556                contract.amount
557            }
558            LightningOutputV0::Incoming(contract) => {
559                if !contract.verify() {
560                    return Err(LightningOutputError::InvalidContract);
561                }
562
563                if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
564                    return Err(LightningOutputError::ContractExpired);
565                }
566
567                dbtx.insert_new_entry(&IncomingContractKey(outpoint), contract)
568                    .await;
569
570                dbtx.insert_entry(
571                    &IncomingContractOutpointKey(contract.contract_id()),
572                    &outpoint,
573                )
574                .await;
575
576                let stream_index = dbtx
577                    .get_value(&IncomingContractStreamIndexKey)
578                    .await
579                    .unwrap_or(0);
580
581                dbtx.insert_entry(&IncomingContractStreamKey(stream_index), contract)
582                    .await;
583
584                dbtx.insert_entry(&IncomingContractIndexKey(outpoint), &stream_index)
585                    .await;
586
587                dbtx.insert_entry(&IncomingContractStreamIndexKey, &(stream_index + 1))
588                    .await;
589
590                let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
591
592                dbtx.insert_entry(&DecryptionKeyShareKey(outpoint), &dk_share)
593                    .await;
594
595                contract.commitment.amount
596            }
597        };
598
599        Ok(TransactionItemAmounts {
600            amounts: Amounts::new_bitcoin(amount),
601            fees: Amounts::new_bitcoin(self.cfg.consensus.fee_consensus.fee(amount)),
602        })
603    }
604
605    async fn output_status(
606        &self,
607        _dbtx: &mut DatabaseTransaction<'_>,
608        _out_point: OutPoint,
609    ) -> Option<LightningOutputOutcome> {
610        None
611    }
612
613    async fn audit(
614        &self,
615        dbtx: &mut DatabaseTransaction<'_>,
616        audit: &mut Audit,
617        module_instance_id: ModuleInstanceId,
618    ) {
619        // Both incoming and outgoing contracts represent liabilities to the federation
620        // since they are obligations to issue notes.
621        audit
622            .add_items(
623                dbtx,
624                module_instance_id,
625                &OutgoingContractPrefix,
626                |_, contract| -(contract.amount.msats as i64),
627            )
628            .await;
629
630        audit
631            .add_items(
632                dbtx,
633                module_instance_id,
634                &IncomingContractPrefix,
635                |_, contract| -(contract.commitment.amount.msats as i64),
636            )
637            .await;
638    }
639
640    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
641        vec![
642            api_endpoint! {
643                CONSENSUS_BLOCK_COUNT_ENDPOINT,
644                ApiVersion::new(0, 0),
645                async |module: &Lightning, context, _params : () | -> u64 {
646                    let db = context.db();
647                    let mut dbtx = db.begin_transaction_nc().await;
648
649                    Ok(module.consensus_block_count(&mut dbtx).await)
650                }
651            },
652            api_endpoint! {
653                AWAIT_INCOMING_CONTRACT_ENDPOINT,
654                ApiVersion::new(0, 0),
655                async |module: &Lightning, context, params: (ContractId, u64) | -> Option<OutPoint> {
656                    let db = context.db();
657
658                    Ok(module.await_incoming_contract(db, params.0, params.1).await)
659                }
660            },
661            api_endpoint! {
662                AWAIT_PREIMAGE_ENDPOINT,
663                ApiVersion::new(0, 0),
664                async |module: &Lightning, context, params: (OutPoint, u64)| -> Option<[u8; 32]> {
665                    let db = context.db();
666
667                    Ok(module.await_preimage(db, params.0, params.1).await)
668                }
669            },
670            api_endpoint! {
671                DECRYPTION_KEY_SHARE_ENDPOINT,
672                ApiVersion::new(0, 0),
673                async |_module: &Lightning, context, params: OutPoint| -> DecryptionKeyShare {
674                    let share = context
675                        .db()
676                        .begin_transaction_nc()
677                        .await
678                        .get_value(&DecryptionKeyShareKey(params))
679                        .await
680                        .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
681
682                    Ok(share)
683                }
684            },
685            api_endpoint! {
686                OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
687                ApiVersion::new(0, 0),
688                async |module: &Lightning, context, outpoint: OutPoint| -> Option<(ContractId, u64)> {
689                    let db = context.db();
690
691                    Ok(module.outgoing_contract_expiration(db, outpoint).await)
692                }
693            },
694            api_endpoint! {
695                AWAIT_INCOMING_CONTRACTS_ENDPOINT,
696                ApiVersion::new(0, 0),
697                async |module: &Lightning, context, params: (u64, usize)| -> (Vec<IncomingContract>, u64) {
698                    let db = context.db();
699
700                    if params.1 == 0 {
701                        return Err(ApiError::bad_request("Batch size must be greater than 0".to_string()));
702                    }
703
704                    Ok(module.await_incoming_contracts(db, params.0, params.1).await)
705                }
706            },
707            api_endpoint! {
708                ADD_GATEWAY_ENDPOINT,
709                ApiVersion::new(0, 0),
710                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
711                    check_auth(context)?;
712
713                    let db = context.db();
714
715                    Ok(Lightning::add_gateway(db, gateway).await)
716                }
717            },
718            api_endpoint! {
719                REMOVE_GATEWAY_ENDPOINT,
720                ApiVersion::new(0, 0),
721                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
722                    check_auth(context)?;
723
724                    let db = context.db();
725
726                    Ok(Lightning::remove_gateway(db, gateway).await)
727                }
728            },
729            api_endpoint! {
730                GATEWAYS_ENDPOINT,
731                ApiVersion::new(0, 0),
732                async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
733                    let db = context.db();
734
735                    Ok(Lightning::gateways(db).await)
736                }
737            },
738        ]
739    }
740}
741
742impl Lightning {
743    fn get_block_count(&self) -> anyhow::Result<u64> {
744        self.server_bitcoin_rpc_monitor
745            .status()
746            .map(|status| status.block_count)
747            .context("Block count not available yet")
748    }
749
750    async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
751        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
752
753        let mut counts = dbtx
754            .find_by_prefix(&BlockCountVotePrefix)
755            .await
756            .map(|entry| entry.1)
757            .collect::<Vec<u64>>()
758            .await;
759
760        counts.sort_unstable();
761
762        counts.reverse();
763
764        assert!(counts.last() <= counts.first());
765
766        // The block count we select guarantees that any threshold of correct peers can
767        // increase the consensus block count and any consensus block count has been
768        // confirmed by a threshold of peers.
769
770        counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
771    }
772
773    async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
774        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
775
776        let mut times = dbtx
777            .find_by_prefix(&UnixTimeVotePrefix)
778            .await
779            .map(|entry| entry.1)
780            .collect::<Vec<u64>>()
781            .await;
782
783        times.sort_unstable();
784
785        times.reverse();
786
787        assert!(times.last() <= times.first());
788
789        // The unix time we select guarantees that any threshold of correct peers can
790        // advance the consensus unix time and any consensus unix time has been
791        // confirmed by a threshold of peers.
792
793        times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
794    }
795
796    async fn await_incoming_contract(
797        &self,
798        db: Database,
799        contract_id: ContractId,
800        expiration: u64,
801    ) -> Option<OutPoint> {
802        loop {
803            timeout(
804                Duration::from_secs(10),
805                db.wait_key_exists(&IncomingContractOutpointKey(contract_id)),
806            )
807            .await
808            .ok();
809
810            // to avoid race conditions we have to check for the contract and
811            // its expiration in the same database transaction
812            let mut dbtx = db.begin_transaction_nc().await;
813
814            if let Some(outpoint) = dbtx
815                .get_value(&IncomingContractOutpointKey(contract_id))
816                .await
817            {
818                return Some(outpoint);
819            }
820
821            if expiration <= self.consensus_unix_time(&mut dbtx).await {
822                return None;
823            }
824        }
825    }
826
827    async fn await_preimage(
828        &self,
829        db: Database,
830        outpoint: OutPoint,
831        expiration: u64,
832    ) -> Option<[u8; 32]> {
833        loop {
834            timeout(
835                Duration::from_secs(10),
836                db.wait_key_exists(&PreimageKey(outpoint)),
837            )
838            .await
839            .ok();
840
841            // to avoid race conditions we have to check for the preimage and
842            // the contracts expiration in the same database transaction
843            let mut dbtx = db.begin_transaction_nc().await;
844
845            if let Some(preimage) = dbtx.get_value(&PreimageKey(outpoint)).await {
846                return Some(preimage);
847            }
848
849            if expiration <= self.consensus_block_count(&mut dbtx).await {
850                return None;
851            }
852        }
853    }
854
855    async fn outgoing_contract_expiration(
856        &self,
857        db: Database,
858        outpoint: OutPoint,
859    ) -> Option<(ContractId, u64)> {
860        let mut dbtx = db.begin_transaction_nc().await;
861
862        let contract = dbtx.get_value(&OutgoingContractKey(outpoint)).await?;
863
864        let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
865
866        let expiration = contract.expiration.saturating_sub(consensus_block_count);
867
868        Some((contract.contract_id(), expiration))
869    }
870
871    async fn await_incoming_contracts(
872        &self,
873        db: Database,
874        start: u64,
875        n: usize,
876    ) -> (Vec<IncomingContract>, u64) {
877        let filter = |next_index: Option<u64>| next_index.filter(|i| *i > start);
878
879        let (mut next_index, mut dbtx) = db
880            .wait_key_check(&IncomingContractStreamIndexKey, filter)
881            .await;
882
883        let mut contracts = Vec::with_capacity(n);
884
885        let range = IncomingContractStreamKey(start)..IncomingContractStreamKey(u64::MAX);
886
887        for (key, contract) in dbtx
888            .find_by_range(range)
889            .await
890            .take(n)
891            .collect::<Vec<(IncomingContractStreamKey, IncomingContract)>>()
892            .await
893        {
894            contracts.push(contract.clone());
895            next_index = key.0 + 1;
896        }
897
898        (contracts, next_index)
899    }
900
901    async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
902        let mut dbtx = db.begin_transaction().await;
903
904        let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
905
906        dbtx.commit_tx().await;
907
908        is_new_entry
909    }
910
911    async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
912        let mut dbtx = db.begin_transaction().await;
913
914        let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
915
916        dbtx.commit_tx().await;
917
918        entry_existed
919    }
920
921    async fn gateways(db: Database) -> Vec<SafeUrl> {
922        db.begin_transaction_nc()
923            .await
924            .find_by_prefix(&GatewayPrefix)
925            .await
926            .map(|entry| entry.0.0)
927            .collect()
928            .await
929    }
930
931    pub async fn consensus_block_count_ui(&self) -> u64 {
932        self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
933            .await
934    }
935
936    pub async fn consensus_unix_time_ui(&self) -> u64 {
937        self.consensus_unix_time(&mut self.db.begin_transaction_nc().await)
938            .await
939    }
940
941    pub async fn add_gateway_ui(&self, gateway: SafeUrl) -> bool {
942        Self::add_gateway(self.db.clone(), gateway).await
943    }
944
945    pub async fn remove_gateway_ui(&self, gateway: SafeUrl) -> bool {
946        Self::remove_gateway(self.db.clone(), gateway).await
947    }
948
949    pub async fn gateways_ui(&self) -> Vec<SafeUrl> {
950        Self::gateways(self.db.clone()).await
951    }
952}