Skip to main content

fedimint_lnv2_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5pub use fedimint_lnv2_common as common;
6
7mod db;
8
9use std::collections::{BTreeMap, BTreeSet};
10use std::time::Duration;
11
12use anyhow::{Context, anyhow, ensure};
13use bls12_381::{G1Projective, Scalar};
14use fedimint_core::bitcoin::hashes::sha256;
15use fedimint_core::config::{
16    ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
17    TypedServerModuleConsensusConfig,
18};
19use fedimint_core::core::ModuleInstanceId;
20use fedimint_core::db::{
21    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
22};
23use fedimint_core::encoding::Encodable;
24use fedimint_core::envs::{FM_ENABLE_MODULE_LNV2_ENV, is_env_var_set_opt};
25use fedimint_core::module::audit::Audit;
26use fedimint_core::module::{
27    Amounts, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
28    InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
29    TransactionItemAmounts, api_endpoint,
30};
31use fedimint_core::net::auth::check_auth;
32use fedimint_core::task::timeout;
33use fedimint_core::time::duration_since_epoch;
34use fedimint_core::util::SafeUrl;
35use fedimint_core::{
36    BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
37    push_db_pair_items,
38};
39use fedimint_lnv2_common::config::{
40    FeeConsensus, LightningClientConfig, LightningConfig, LightningConfigConsensus,
41    LightningConfigPrivate,
42};
43use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
44use fedimint_lnv2_common::endpoint_constants::{
45    ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_INCOMING_CONTRACTS_ENDPOINT,
46    AWAIT_PREIMAGE_ENDPOINT, CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT,
47    GATEWAYS_ENDPOINT, OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
48};
49use fedimint_lnv2_common::{
50    ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
51    LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
52    LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
53};
54use fedimint_logging::LOG_MODULE_LNV2;
55use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
56use fedimint_server_core::config::{PeerHandleOps, eval_poly_g1};
57use fedimint_server_core::migration::ServerModuleDbMigrationFn;
58use fedimint_server_core::{
59    ConfigGenModuleArgs, ServerModule, ServerModuleInit, ServerModuleInitArgs,
60};
61use futures::StreamExt;
62use group::Curve;
63use group::ff::Field;
64use rand::SeedableRng;
65use rand_chacha::ChaChaRng;
66use strum::IntoEnumIterator;
67use tpe::{
68    AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
69};
70use tracing::trace;
71
72use crate::db::{
73    BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
74    DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractIndexKey,
75    IncomingContractIndexPrefix, IncomingContractKey, IncomingContractOutpointKey,
76    IncomingContractOutpointPrefix, IncomingContractPrefix, IncomingContractStreamIndexKey,
77    IncomingContractStreamKey, IncomingContractStreamPrefix, OutgoingContractKey,
78    OutgoingContractPrefix, PreimageKey, PreimagePrefix, UnixTimeVoteKey, UnixTimeVotePrefix,
79};
80
81#[derive(Debug, Clone)]
82pub struct LightningInit;
83
84impl ModuleInit for LightningInit {
85    type Common = LightningCommonInit;
86
87    #[allow(clippy::too_many_lines)]
88    async fn dump_database(
89        &self,
90        dbtx: &mut DatabaseTransaction<'_>,
91        prefix_names: Vec<String>,
92    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
93        let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
94            BTreeMap::new();
95
96        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
97            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
98        });
99
100        for table in filtered_prefixes {
101            match table {
102                DbKeyPrefix::BlockCountVote => {
103                    push_db_pair_items!(
104                        dbtx,
105                        BlockCountVotePrefix,
106                        BlockCountVoteKey,
107                        u64,
108                        lightning,
109                        "Lightning Block Count Votes"
110                    );
111                }
112                DbKeyPrefix::UnixTimeVote => {
113                    push_db_pair_items!(
114                        dbtx,
115                        UnixTimeVotePrefix,
116                        UnixTimeVoteKey,
117                        u64,
118                        lightning,
119                        "Lightning Unix Time Votes"
120                    );
121                }
122                DbKeyPrefix::OutgoingContract => {
123                    push_db_pair_items!(
124                        dbtx,
125                        OutgoingContractPrefix,
126                        LightningOutgoingContractKey,
127                        OutgoingContract,
128                        lightning,
129                        "Lightning Outgoing Contracts"
130                    );
131                }
132                DbKeyPrefix::IncomingContract => {
133                    push_db_pair_items!(
134                        dbtx,
135                        IncomingContractPrefix,
136                        LightningIncomingContractKey,
137                        IncomingContract,
138                        lightning,
139                        "Lightning Incoming Contracts"
140                    );
141                }
142                DbKeyPrefix::IncomingContractOutpoint => {
143                    push_db_pair_items!(
144                        dbtx,
145                        IncomingContractOutpointPrefix,
146                        LightningIncomingContractOutpointKey,
147                        OutPoint,
148                        lightning,
149                        "Lightning Incoming Contracts Outpoints"
150                    );
151                }
152                DbKeyPrefix::DecryptionKeyShare => {
153                    push_db_pair_items!(
154                        dbtx,
155                        DecryptionKeySharePrefix,
156                        DecryptionKeyShareKey,
157                        DecryptionKeyShare,
158                        lightning,
159                        "Lightning Decryption Key Share"
160                    );
161                }
162                DbKeyPrefix::Preimage => {
163                    push_db_pair_items!(
164                        dbtx,
165                        PreimagePrefix,
166                        LightningPreimageKey,
167                        [u8; 32],
168                        lightning,
169                        "Lightning Preimages"
170                    );
171                }
172                DbKeyPrefix::Gateway => {
173                    push_db_pair_items!(
174                        dbtx,
175                        GatewayPrefix,
176                        GatewayKey,
177                        (),
178                        lightning,
179                        "Lightning Gateways"
180                    );
181                }
182                DbKeyPrefix::IncomingContractStreamIndex => {
183                    push_db_pair_items!(
184                        dbtx,
185                        IncomingContractStreamIndexKey,
186                        IncomingContractStreamIndexKey,
187                        u64,
188                        lightning,
189                        "Lightning Incoming Contract Stream Index"
190                    );
191                }
192                DbKeyPrefix::IncomingContractStream => {
193                    push_db_pair_items!(
194                        dbtx,
195                        IncomingContractStreamPrefix(0),
196                        IncomingContractStreamKey,
197                        IncomingContract,
198                        lightning,
199                        "Lightning Incoming Contract Stream"
200                    );
201                }
202                DbKeyPrefix::IncomingContractIndex => {
203                    push_db_pair_items!(
204                        dbtx,
205                        IncomingContractIndexPrefix,
206                        IncomingContractIndexKey,
207                        u64,
208                        lightning,
209                        "Lightning Incoming Contract Index"
210                    );
211                }
212            }
213        }
214
215        Box::new(lightning.into_iter())
216    }
217}
218
219#[apply(async_trait_maybe_send!)]
220impl ServerModuleInit for LightningInit {
221    type Module = Lightning;
222
223    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
224        &[MODULE_CONSENSUS_VERSION]
225    }
226
227    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
228        SupportedModuleApiVersions::from_raw(
229            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
230            (
231                MODULE_CONSENSUS_VERSION.major,
232                MODULE_CONSENSUS_VERSION.minor,
233            ),
234            &[(0, 0)],
235        )
236    }
237
238    fn is_enabled_by_default(&self) -> bool {
239        is_env_var_set_opt(FM_ENABLE_MODULE_LNV2_ENV).unwrap_or(true)
240    }
241
242    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
243        Ok(Lightning {
244            cfg: args.cfg().to_typed()?,
245            db: args.db().clone(),
246            server_bitcoin_rpc_monitor: args.server_bitcoin_rpc_monitor(),
247        })
248    }
249
250    fn trusted_dealer_gen(
251        &self,
252        peers: &[PeerId],
253        args: &ConfigGenModuleArgs,
254    ) -> BTreeMap<PeerId, ServerModuleConfig> {
255        let tpe_pks = peers
256            .iter()
257            .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
258            .collect::<BTreeMap<PeerId, PublicKeyShare>>();
259
260        peers
261            .iter()
262            .map(|peer| {
263                let cfg = LightningConfig {
264                    consensus: LightningConfigConsensus {
265                        tpe_agg_pk: dealer_agg_pk(),
266                        tpe_pks: tpe_pks.clone(),
267                        fee_consensus: if args.disable_base_fees {
268                            FeeConsensus::zero()
269                        } else {
270                            FeeConsensus::new(0).expect("Relative fee is within range")
271                        },
272                        network: args.network,
273                    },
274                    private: LightningConfigPrivate {
275                        sk: dealer_sk(peers.to_num_peers(), *peer),
276                    },
277                };
278
279                (*peer, cfg.to_erased())
280            })
281            .collect()
282    }
283
284    async fn distributed_gen(
285        &self,
286        peers: &(dyn PeerHandleOps + Send + Sync),
287        args: &ConfigGenModuleArgs,
288    ) -> anyhow::Result<ServerModuleConfig> {
289        let (polynomial, sks) = peers.run_dkg_g1().await?;
290
291        let server = LightningConfig {
292            consensus: LightningConfigConsensus {
293                tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
294                tpe_pks: peers
295                    .num_peers()
296                    .peer_ids()
297                    .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
298                    .collect(),
299                fee_consensus: if args.disable_base_fees {
300                    FeeConsensus::zero()
301                } else {
302                    FeeConsensus::new(0).expect("Relative fee is within range")
303                },
304                network: args.network,
305            },
306            private: LightningConfigPrivate {
307                sk: SecretKeyShare(sks),
308            },
309        };
310
311        Ok(server.to_erased())
312    }
313
314    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
315        let config = config.to_typed::<LightningConfig>()?;
316
317        ensure!(
318            tpe::derive_pk_share(&config.private.sk)
319                == *config
320                    .consensus
321                    .tpe_pks
322                    .get(identity)
323                    .context("Public key set has no key for our identity")?,
324            "Preimge encryption secret key share does not match our public key share"
325        );
326
327        Ok(())
328    }
329
330    fn get_client_config(
331        &self,
332        config: &ServerModuleConsensusConfig,
333    ) -> anyhow::Result<LightningClientConfig> {
334        let config = LightningConfigConsensus::from_erased(config)?;
335        Ok(LightningClientConfig {
336            tpe_agg_pk: config.tpe_agg_pk,
337            tpe_pks: config.tpe_pks,
338            fee_consensus: config.fee_consensus,
339            network: config.network,
340        })
341    }
342
343    fn get_database_migrations(
344        &self,
345    ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Lightning>> {
346        let mut migrations: BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Lightning>> =
347            BTreeMap::new();
348
349        migrations.insert(
350            DatabaseVersion(0),
351            Box::new(move |ctx| Box::pin(crate::db::migrate_to_v1(ctx))),
352        );
353
354        migrations
355    }
356
357    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
358        Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
359    }
360}
361
362fn dealer_agg_pk() -> AggregatePublicKey {
363    AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
364}
365
366fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
367    derive_pk_share(&dealer_sk(num_peers, peer))
368}
369
370fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
371    let x = Scalar::from(peer.to_usize() as u64 + 1);
372
373    // We evaluate the scalar polynomial of degree threshold - 1 at the point x
374    // using the Horner schema.
375
376    let y = (0..num_peers.threshold())
377        .map(|index| coefficient(index as u64))
378        .rev()
379        .reduce(|accumulator, c| accumulator * x + c)
380        .expect("We have at least one coefficient");
381
382    SecretKeyShare(y)
383}
384
385fn coefficient(index: u64) -> Scalar {
386    Scalar::random(&mut ChaChaRng::from_seed(
387        *index.consensus_hash::<sha256::Hash>().as_byte_array(),
388    ))
389}
390
391#[derive(Debug)]
392pub struct Lightning {
393    cfg: LightningConfig,
394    db: Database,
395    server_bitcoin_rpc_monitor: ServerBitcoinRpcMonitor,
396}
397
398#[apply(async_trait_maybe_send!)]
399impl ServerModule for Lightning {
400    type Common = LightningModuleTypes;
401    type Init = LightningInit;
402
403    async fn consensus_proposal(
404        &self,
405        _dbtx: &mut DatabaseTransaction<'_>,
406    ) -> Vec<LightningConsensusItem> {
407        // We reduce the time granularity to deduplicate votes more often and not save
408        // one consensus item every second.
409        let mut items = vec![LightningConsensusItem::UnixTimeVote(
410            60 * (duration_since_epoch().as_secs() / 60),
411        )];
412
413        if let Ok(block_count) = self.get_block_count() {
414            trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
415            items.push(LightningConsensusItem::BlockCountVote(block_count));
416        }
417
418        items
419    }
420
421    async fn process_consensus_item<'a, 'b>(
422        &'a self,
423        dbtx: &mut DatabaseTransaction<'b>,
424        consensus_item: LightningConsensusItem,
425        peer: PeerId,
426    ) -> anyhow::Result<()> {
427        trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
428
429        match consensus_item {
430            LightningConsensusItem::BlockCountVote(vote) => {
431                let current_vote = dbtx
432                    .insert_entry(&BlockCountVoteKey(peer), &vote)
433                    .await
434                    .unwrap_or(0);
435
436                ensure!(current_vote < vote, "Block count vote is redundant");
437
438                Ok(())
439            }
440            LightningConsensusItem::UnixTimeVote(vote) => {
441                let current_vote = dbtx
442                    .insert_entry(&UnixTimeVoteKey(peer), &vote)
443                    .await
444                    .unwrap_or(0);
445
446                ensure!(current_vote < vote, "Unix time vote is redundant");
447
448                Ok(())
449            }
450            LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
451                "Received lnv2 consensus item with unknown variant {variant}"
452            )),
453        }
454    }
455
456    async fn process_input<'a, 'b, 'c>(
457        &'a self,
458        dbtx: &mut DatabaseTransaction<'c>,
459        input: &'b LightningInput,
460        _in_point: InPoint,
461    ) -> Result<InputMeta, LightningInputError> {
462        let (pub_key, amount) = match input.ensure_v0_ref()? {
463            LightningInputV0::Outgoing(outpoint, outgoing_witness) => {
464                let contract = dbtx
465                    .remove_entry(&OutgoingContractKey(*outpoint))
466                    .await
467                    .ok_or(LightningInputError::UnknownContract)?;
468
469                let pub_key = match outgoing_witness {
470                    OutgoingWitness::Claim(preimage) => {
471                        if contract.expiration <= self.consensus_block_count(dbtx).await {
472                            return Err(LightningInputError::Expired);
473                        }
474
475                        if !contract.verify_preimage(preimage) {
476                            return Err(LightningInputError::InvalidPreimage);
477                        }
478
479                        dbtx.insert_entry(&PreimageKey(*outpoint), preimage).await;
480
481                        contract.claim_pk
482                    }
483                    OutgoingWitness::Refund => {
484                        if contract.expiration > self.consensus_block_count(dbtx).await {
485                            return Err(LightningInputError::NotExpired);
486                        }
487
488                        contract.refund_pk
489                    }
490                    OutgoingWitness::Cancel(forfeit_signature) => {
491                        if !contract.verify_forfeit_signature(forfeit_signature) {
492                            return Err(LightningInputError::InvalidForfeitSignature);
493                        }
494
495                        contract.refund_pk
496                    }
497                };
498
499                (pub_key, contract.amount)
500            }
501            LightningInputV0::Incoming(outpoint, agg_decryption_key) => {
502                let contract = dbtx
503                    .remove_entry(&IncomingContractKey(*outpoint))
504                    .await
505                    .ok_or(LightningInputError::UnknownContract)?;
506
507                let index = dbtx
508                    .remove_entry(&IncomingContractIndexKey(*outpoint))
509                    .await
510                    .expect("Incoming contract index should exist");
511
512                dbtx.remove_entry(&IncomingContractStreamKey(index)).await;
513
514                if !contract
515                    .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
516                {
517                    return Err(LightningInputError::InvalidDecryptionKey);
518                }
519
520                let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
521                    Some(..) => contract.commitment.claim_pk,
522                    None => contract.commitment.refund_pk,
523                };
524
525                (pub_key, contract.commitment.amount)
526            }
527        };
528
529        Ok(InputMeta {
530            amount: TransactionItemAmounts {
531                amounts: Amounts::new_bitcoin(amount),
532                fees: Amounts::new_bitcoin(self.cfg.consensus.fee_consensus.fee(amount)),
533            },
534            pub_key,
535        })
536    }
537
538    async fn process_output<'a, 'b>(
539        &'a self,
540        dbtx: &mut DatabaseTransaction<'b>,
541        output: &'a LightningOutput,
542        outpoint: OutPoint,
543    ) -> Result<TransactionItemAmounts, LightningOutputError> {
544        let amount = match output.ensure_v0_ref()? {
545            LightningOutputV0::Outgoing(contract) => {
546                dbtx.insert_new_entry(&OutgoingContractKey(outpoint), contract)
547                    .await;
548
549                contract.amount
550            }
551            LightningOutputV0::Incoming(contract) => {
552                if !contract.verify() {
553                    return Err(LightningOutputError::InvalidContract);
554                }
555
556                if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
557                    return Err(LightningOutputError::ContractExpired);
558                }
559
560                dbtx.insert_new_entry(&IncomingContractKey(outpoint), contract)
561                    .await;
562
563                dbtx.insert_entry(
564                    &IncomingContractOutpointKey(contract.contract_id()),
565                    &outpoint,
566                )
567                .await;
568
569                let stream_index = dbtx
570                    .get_value(&IncomingContractStreamIndexKey)
571                    .await
572                    .unwrap_or(0);
573
574                dbtx.insert_entry(&IncomingContractStreamKey(stream_index), contract)
575                    .await;
576
577                dbtx.insert_entry(&IncomingContractIndexKey(outpoint), &stream_index)
578                    .await;
579
580                dbtx.insert_entry(&IncomingContractStreamIndexKey, &(stream_index + 1))
581                    .await;
582
583                let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
584
585                dbtx.insert_entry(&DecryptionKeyShareKey(outpoint), &dk_share)
586                    .await;
587
588                contract.commitment.amount
589            }
590        };
591
592        Ok(TransactionItemAmounts {
593            amounts: Amounts::new_bitcoin(amount),
594            fees: Amounts::new_bitcoin(self.cfg.consensus.fee_consensus.fee(amount)),
595        })
596    }
597
598    async fn output_status(
599        &self,
600        _dbtx: &mut DatabaseTransaction<'_>,
601        _out_point: OutPoint,
602    ) -> Option<LightningOutputOutcome> {
603        None
604    }
605
606    async fn audit(
607        &self,
608        dbtx: &mut DatabaseTransaction<'_>,
609        audit: &mut Audit,
610        module_instance_id: ModuleInstanceId,
611    ) {
612        // Both incoming and outgoing contracts represent liabilities to the federation
613        // since they are obligations to issue notes.
614        audit
615            .add_items(
616                dbtx,
617                module_instance_id,
618                &OutgoingContractPrefix,
619                |_, contract| -(contract.amount.msats as i64),
620            )
621            .await;
622
623        audit
624            .add_items(
625                dbtx,
626                module_instance_id,
627                &IncomingContractPrefix,
628                |_, contract| -(contract.commitment.amount.msats as i64),
629            )
630            .await;
631    }
632
633    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
634        vec![
635            api_endpoint! {
636                CONSENSUS_BLOCK_COUNT_ENDPOINT,
637                ApiVersion::new(0, 0),
638                async |module: &Lightning, context, _params : () | -> u64 {
639                    let db = context.db();
640                    let mut dbtx = db.begin_transaction_nc().await;
641
642                    Ok(module.consensus_block_count(&mut dbtx).await)
643                }
644            },
645            api_endpoint! {
646                AWAIT_INCOMING_CONTRACT_ENDPOINT,
647                ApiVersion::new(0, 0),
648                async |module: &Lightning, context, params: (ContractId, u64) | -> Option<OutPoint> {
649                    let db = context.db();
650
651                    Ok(module.await_incoming_contract(db, params.0, params.1).await)
652                }
653            },
654            api_endpoint! {
655                AWAIT_PREIMAGE_ENDPOINT,
656                ApiVersion::new(0, 0),
657                async |module: &Lightning, context, params: (OutPoint, u64)| -> Option<[u8; 32]> {
658                    let db = context.db();
659
660                    Ok(module.await_preimage(db, params.0, params.1).await)
661                }
662            },
663            api_endpoint! {
664                DECRYPTION_KEY_SHARE_ENDPOINT,
665                ApiVersion::new(0, 0),
666                async |_module: &Lightning, context, params: OutPoint| -> DecryptionKeyShare {
667                    let share = context
668                        .db()
669                        .begin_transaction_nc()
670                        .await
671                        .get_value(&DecryptionKeyShareKey(params))
672                        .await
673                        .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
674
675                    Ok(share)
676                }
677            },
678            api_endpoint! {
679                OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
680                ApiVersion::new(0, 0),
681                async |module: &Lightning, context, outpoint: OutPoint| -> Option<(ContractId, u64)> {
682                    let db = context.db();
683
684                    Ok(module.outgoing_contract_expiration(db, outpoint).await)
685                }
686            },
687            api_endpoint! {
688                AWAIT_INCOMING_CONTRACTS_ENDPOINT,
689                ApiVersion::new(0, 0),
690                async |module: &Lightning, context, params: (u64, usize)| -> (Vec<IncomingContract>, u64) {
691                    let db = context.db();
692
693                    if params.1 == 0 {
694                        return Err(ApiError::bad_request("Batch size must be greater than 0".to_string()));
695                    }
696
697                    Ok(module.await_incoming_contracts(db, params.0, params.1).await)
698                }
699            },
700            api_endpoint! {
701                ADD_GATEWAY_ENDPOINT,
702                ApiVersion::new(0, 0),
703                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
704                    check_auth(context)?;
705
706                    let db = context.db();
707
708                    Ok(Lightning::add_gateway(db, gateway).await)
709                }
710            },
711            api_endpoint! {
712                REMOVE_GATEWAY_ENDPOINT,
713                ApiVersion::new(0, 0),
714                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
715                    check_auth(context)?;
716
717                    let db = context.db();
718
719                    Ok(Lightning::remove_gateway(db, gateway).await)
720                }
721            },
722            api_endpoint! {
723                GATEWAYS_ENDPOINT,
724                ApiVersion::new(0, 0),
725                async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
726                    let db = context.db();
727
728                    Ok(Lightning::gateways(db).await)
729                }
730            },
731        ]
732    }
733}
734
735impl Lightning {
736    fn get_block_count(&self) -> anyhow::Result<u64> {
737        self.server_bitcoin_rpc_monitor
738            .status()
739            .map(|status| status.block_count)
740            .context("Block count not available yet")
741    }
742
743    async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
744        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
745
746        let mut counts = dbtx
747            .find_by_prefix(&BlockCountVotePrefix)
748            .await
749            .map(|entry| entry.1)
750            .collect::<Vec<u64>>()
751            .await;
752
753        counts.sort_unstable();
754
755        counts.reverse();
756
757        assert!(counts.last() <= counts.first());
758
759        // The block count we select guarantees that any threshold of correct peers can
760        // increase the consensus block count and any consensus block count has been
761        // confirmed by a threshold of peers.
762
763        counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
764    }
765
766    async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
767        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
768
769        let mut times = dbtx
770            .find_by_prefix(&UnixTimeVotePrefix)
771            .await
772            .map(|entry| entry.1)
773            .collect::<Vec<u64>>()
774            .await;
775
776        times.sort_unstable();
777
778        times.reverse();
779
780        assert!(times.last() <= times.first());
781
782        // The unix time we select guarantees that any threshold of correct peers can
783        // advance the consensus unix time and any consensus unix time has been
784        // confirmed by a threshold of peers.
785
786        times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
787    }
788
789    async fn await_incoming_contract(
790        &self,
791        db: Database,
792        contract_id: ContractId,
793        expiration: u64,
794    ) -> Option<OutPoint> {
795        loop {
796            timeout(
797                Duration::from_secs(10),
798                db.wait_key_exists(&IncomingContractOutpointKey(contract_id)),
799            )
800            .await
801            .ok();
802
803            // to avoid race conditions we have to check for the contract and
804            // its expiration in the same database transaction
805            let mut dbtx = db.begin_transaction_nc().await;
806
807            if let Some(outpoint) = dbtx
808                .get_value(&IncomingContractOutpointKey(contract_id))
809                .await
810            {
811                return Some(outpoint);
812            }
813
814            if expiration <= self.consensus_unix_time(&mut dbtx).await {
815                return None;
816            }
817        }
818    }
819
820    async fn await_preimage(
821        &self,
822        db: Database,
823        outpoint: OutPoint,
824        expiration: u64,
825    ) -> Option<[u8; 32]> {
826        loop {
827            timeout(
828                Duration::from_secs(10),
829                db.wait_key_exists(&PreimageKey(outpoint)),
830            )
831            .await
832            .ok();
833
834            // to avoid race conditions we have to check for the preimage and
835            // the contracts expiration in the same database transaction
836            let mut dbtx = db.begin_transaction_nc().await;
837
838            if let Some(preimage) = dbtx.get_value(&PreimageKey(outpoint)).await {
839                return Some(preimage);
840            }
841
842            if expiration <= self.consensus_block_count(&mut dbtx).await {
843                return None;
844            }
845        }
846    }
847
848    async fn outgoing_contract_expiration(
849        &self,
850        db: Database,
851        outpoint: OutPoint,
852    ) -> Option<(ContractId, u64)> {
853        let mut dbtx = db.begin_transaction_nc().await;
854
855        let contract = dbtx.get_value(&OutgoingContractKey(outpoint)).await?;
856
857        let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
858
859        let expiration = contract.expiration.saturating_sub(consensus_block_count);
860
861        Some((contract.contract_id(), expiration))
862    }
863
864    async fn await_incoming_contracts(
865        &self,
866        db: Database,
867        start: u64,
868        n: usize,
869    ) -> (Vec<IncomingContract>, u64) {
870        let filter = |next_index: Option<u64>| next_index.filter(|i| *i > start);
871
872        let (mut next_index, mut dbtx) = db
873            .wait_key_check(&IncomingContractStreamIndexKey, filter)
874            .await;
875
876        let mut contracts = Vec::with_capacity(n);
877
878        let range = IncomingContractStreamKey(start)..IncomingContractStreamKey(u64::MAX);
879
880        for (key, contract) in dbtx
881            .find_by_range(range)
882            .await
883            .take(n)
884            .collect::<Vec<(IncomingContractStreamKey, IncomingContract)>>()
885            .await
886        {
887            contracts.push(contract.clone());
888            next_index = key.0 + 1;
889        }
890
891        (contracts, next_index)
892    }
893
894    async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
895        let mut dbtx = db.begin_transaction().await;
896
897        let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
898
899        dbtx.commit_tx().await;
900
901        is_new_entry
902    }
903
904    async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
905        let mut dbtx = db.begin_transaction().await;
906
907        let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
908
909        dbtx.commit_tx().await;
910
911        entry_existed
912    }
913
914    async fn gateways(db: Database) -> Vec<SafeUrl> {
915        db.begin_transaction_nc()
916            .await
917            .find_by_prefix(&GatewayPrefix)
918            .await
919            .map(|entry| entry.0.0)
920            .collect()
921            .await
922    }
923
924    pub async fn consensus_block_count_ui(&self) -> u64 {
925        self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
926            .await
927    }
928
929    pub async fn consensus_unix_time_ui(&self) -> u64 {
930        self.consensus_unix_time(&mut self.db.begin_transaction_nc().await)
931            .await
932    }
933
934    pub async fn add_gateway_ui(&self, gateway: SafeUrl) -> bool {
935        Self::add_gateway(self.db.clone(), gateway).await
936    }
937
938    pub async fn remove_gateway_ui(&self, gateway: SafeUrl) -> bool {
939        Self::remove_gateway(self.db.clone(), gateway).await
940    }
941
942    pub async fn gateways_ui(&self) -> Vec<SafeUrl> {
943        Self::gateways(self.db.clone()).await
944    }
945}