fedimint_lnv2_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5mod db;
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::time::Duration;
9
10use anyhow::{Context, anyhow, ensure};
11use bls12_381::{G1Projective, Scalar};
12use fedimint_core::bitcoin::hashes::sha256;
13use fedimint_core::config::{
14    ConfigGenModuleParams, ServerModuleConfig, ServerModuleConsensusConfig,
15    TypedServerModuleConfig, TypedServerModuleConsensusConfig,
16};
17use fedimint_core::core::ModuleInstanceId;
18use fedimint_core::db::{
19    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
20};
21use fedimint_core::encoding::Encodable;
22use fedimint_core::module::audit::Audit;
23use fedimint_core::module::{
24    Amounts, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
25    InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
26    TransactionItemAmounts, api_endpoint,
27};
28use fedimint_core::task::timeout;
29use fedimint_core::time::duration_since_epoch;
30use fedimint_core::util::SafeUrl;
31use fedimint_core::{
32    BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
33    push_db_pair_items,
34};
35use fedimint_lnv2_common::config::{
36    FeeConsensus, LightningClientConfig, LightningConfig, LightningConfigConsensus,
37    LightningConfigPrivate, LightningGenParams,
38};
39use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
40use fedimint_lnv2_common::endpoint_constants::{
41    ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_INCOMING_CONTRACTS_ENDPOINT,
42    AWAIT_PREIMAGE_ENDPOINT, CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT,
43    GATEWAYS_ENDPOINT, OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
44};
45use fedimint_lnv2_common::{
46    ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
47    LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
48    LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
49};
50use fedimint_logging::LOG_MODULE_LNV2;
51use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
52use fedimint_server_core::config::{PeerHandleOps, eval_poly_g1};
53use fedimint_server_core::migration::ServerModuleDbMigrationFn;
54use fedimint_server_core::net::check_auth;
55use fedimint_server_core::{ServerModule, ServerModuleInit, ServerModuleInitArgs};
56use futures::StreamExt;
57use group::Curve;
58use group::ff::Field;
59use rand::SeedableRng;
60use rand_chacha::ChaChaRng;
61use strum::IntoEnumIterator;
62use tpe::{
63    AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
64};
65use tracing::trace;
66
67use crate::db::{
68    BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
69    DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractIndexKey,
70    IncomingContractIndexPrefix, IncomingContractKey, IncomingContractOutpointKey,
71    IncomingContractOutpointPrefix, IncomingContractPrefix, IncomingContractStreamIndexKey,
72    IncomingContractStreamKey, IncomingContractStreamPrefix, OutgoingContractKey,
73    OutgoingContractPrefix, PreimageKey, PreimagePrefix, UnixTimeVoteKey, UnixTimeVotePrefix,
74};
75
76#[derive(Debug, Clone)]
77pub struct LightningInit;
78
79impl ModuleInit for LightningInit {
80    type Common = LightningCommonInit;
81
82    #[allow(clippy::too_many_lines)]
83    async fn dump_database(
84        &self,
85        dbtx: &mut DatabaseTransaction<'_>,
86        prefix_names: Vec<String>,
87    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
88        let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
89            BTreeMap::new();
90
91        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
92            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
93        });
94
95        for table in filtered_prefixes {
96            match table {
97                DbKeyPrefix::BlockCountVote => {
98                    push_db_pair_items!(
99                        dbtx,
100                        BlockCountVotePrefix,
101                        BlockCountVoteKey,
102                        u64,
103                        lightning,
104                        "Lightning Block Count Votes"
105                    );
106                }
107                DbKeyPrefix::UnixTimeVote => {
108                    push_db_pair_items!(
109                        dbtx,
110                        UnixTimeVotePrefix,
111                        UnixTimeVoteKey,
112                        u64,
113                        lightning,
114                        "Lightning Unix Time Votes"
115                    );
116                }
117                DbKeyPrefix::OutgoingContract => {
118                    push_db_pair_items!(
119                        dbtx,
120                        OutgoingContractPrefix,
121                        LightningOutgoingContractKey,
122                        OutgoingContract,
123                        lightning,
124                        "Lightning Outgoing Contracts"
125                    );
126                }
127                DbKeyPrefix::IncomingContract => {
128                    push_db_pair_items!(
129                        dbtx,
130                        IncomingContractPrefix,
131                        LightningIncomingContractKey,
132                        IncomingContract,
133                        lightning,
134                        "Lightning Incoming Contracts"
135                    );
136                }
137                DbKeyPrefix::IncomingContractOutpoint => {
138                    push_db_pair_items!(
139                        dbtx,
140                        IncomingContractOutpointPrefix,
141                        LightningIncomingContractOutpointKey,
142                        OutPoint,
143                        lightning,
144                        "Lightning Incoming Contracts Outpoints"
145                    );
146                }
147                DbKeyPrefix::DecryptionKeyShare => {
148                    push_db_pair_items!(
149                        dbtx,
150                        DecryptionKeySharePrefix,
151                        DecryptionKeyShareKey,
152                        DecryptionKeyShare,
153                        lightning,
154                        "Lightning Decryption Key Share"
155                    );
156                }
157                DbKeyPrefix::Preimage => {
158                    push_db_pair_items!(
159                        dbtx,
160                        PreimagePrefix,
161                        LightningPreimageKey,
162                        [u8; 32],
163                        lightning,
164                        "Lightning Preimages"
165                    );
166                }
167                DbKeyPrefix::Gateway => {
168                    push_db_pair_items!(
169                        dbtx,
170                        GatewayPrefix,
171                        GatewayKey,
172                        (),
173                        lightning,
174                        "Lightning Gateways"
175                    );
176                }
177                DbKeyPrefix::IncomingContractStreamIndex => {
178                    push_db_pair_items!(
179                        dbtx,
180                        IncomingContractStreamIndexKey,
181                        IncomingContractStreamIndexKey,
182                        u64,
183                        lightning,
184                        "Lightning Incoming Contract Stream Index"
185                    );
186                }
187                DbKeyPrefix::IncomingContractStream => {
188                    push_db_pair_items!(
189                        dbtx,
190                        IncomingContractStreamPrefix(0),
191                        IncomingContractStreamKey,
192                        IncomingContract,
193                        lightning,
194                        "Lightning Incoming Contract Stream"
195                    );
196                }
197                DbKeyPrefix::IncomingContractIndex => {
198                    push_db_pair_items!(
199                        dbtx,
200                        IncomingContractIndexPrefix,
201                        IncomingContractIndexKey,
202                        u64,
203                        lightning,
204                        "Lightning Incoming Contract Index"
205                    );
206                }
207            }
208        }
209
210        Box::new(lightning.into_iter())
211    }
212}
213
214#[apply(async_trait_maybe_send!)]
215impl ServerModuleInit for LightningInit {
216    type Module = Lightning;
217    type Params = LightningGenParams;
218
219    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
220        &[MODULE_CONSENSUS_VERSION]
221    }
222
223    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
224        SupportedModuleApiVersions::from_raw(
225            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
226            (
227                MODULE_CONSENSUS_VERSION.major,
228                MODULE_CONSENSUS_VERSION.minor,
229            ),
230            &[(0, 0)],
231        )
232    }
233
234    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
235        Ok(Lightning {
236            cfg: args.cfg().to_typed()?,
237            db: args.db().clone(),
238            server_bitcoin_rpc_monitor: args.server_bitcoin_rpc_monitor(),
239        })
240    }
241
242    fn trusted_dealer_gen(
243        &self,
244        peers: &[PeerId],
245        params: &ConfigGenModuleParams,
246        disable_base_fees: bool,
247    ) -> BTreeMap<PeerId, ServerModuleConfig> {
248        let params = self
249            .parse_params(params)
250            .expect("Failed tp parse lnv2 config gen params");
251
252        let tpe_pks = peers
253            .iter()
254            .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
255            .collect::<BTreeMap<PeerId, PublicKeyShare>>();
256
257        peers
258            .iter()
259            .map(|peer| {
260                let cfg = LightningConfig {
261                    consensus: LightningConfigConsensus {
262                        tpe_agg_pk: dealer_agg_pk(),
263                        tpe_pks: tpe_pks.clone(),
264                        fee_consensus: params.consensus.fee_consensus.clone().unwrap_or(
265                            if disable_base_fees {
266                                FeeConsensus::zero()
267                            } else {
268                                FeeConsensus::new(0).expect("Relative fee is within range")
269                            },
270                        ),
271                        network: params.consensus.network,
272                    },
273                    private: LightningConfigPrivate {
274                        sk: dealer_sk(peers.to_num_peers(), *peer),
275                    },
276                };
277
278                (*peer, cfg.to_erased())
279            })
280            .collect()
281    }
282
283    async fn distributed_gen(
284        &self,
285        peers: &(dyn PeerHandleOps + Send + Sync),
286        params: &ConfigGenModuleParams,
287        disable_base_fees: bool,
288    ) -> anyhow::Result<ServerModuleConfig> {
289        let params = self.parse_params(params).unwrap();
290        let (polynomial, sks) = peers.run_dkg_g1().await?;
291
292        let server = LightningConfig {
293            consensus: LightningConfigConsensus {
294                tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
295                tpe_pks: peers
296                    .num_peers()
297                    .peer_ids()
298                    .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
299                    .collect(),
300                fee_consensus: params.consensus.fee_consensus.clone().unwrap_or(
301                    if disable_base_fees {
302                        FeeConsensus::zero()
303                    } else {
304                        FeeConsensus::new(0).expect("Relative fee is within range")
305                    },
306                ),
307                network: params.consensus.network,
308            },
309            private: LightningConfigPrivate {
310                sk: SecretKeyShare(sks),
311            },
312        };
313
314        Ok(server.to_erased())
315    }
316
317    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
318        let config = config.to_typed::<LightningConfig>()?;
319
320        ensure!(
321            tpe::derive_pk_share(&config.private.sk)
322                == *config
323                    .consensus
324                    .tpe_pks
325                    .get(identity)
326                    .context("Public key set has no key for our identity")?,
327            "Preimge encryption secret key share does not match our public key share"
328        );
329
330        Ok(())
331    }
332
333    fn get_client_config(
334        &self,
335        config: &ServerModuleConsensusConfig,
336    ) -> anyhow::Result<LightningClientConfig> {
337        let config = LightningConfigConsensus::from_erased(config)?;
338        Ok(LightningClientConfig {
339            tpe_agg_pk: config.tpe_agg_pk,
340            tpe_pks: config.tpe_pks,
341            fee_consensus: config.fee_consensus,
342            network: config.network,
343        })
344    }
345
346    fn get_database_migrations(
347        &self,
348    ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Lightning>> {
349        let mut migrations: BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Lightning>> =
350            BTreeMap::new();
351
352        migrations.insert(
353            DatabaseVersion(0),
354            Box::new(move |ctx| Box::pin(crate::db::migrate_to_v1(ctx))),
355        );
356
357        migrations
358    }
359
360    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
361        Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
362    }
363}
364
365fn dealer_agg_pk() -> AggregatePublicKey {
366    AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
367}
368
369fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
370    derive_pk_share(&dealer_sk(num_peers, peer))
371}
372
373fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
374    let x = Scalar::from(peer.to_usize() as u64 + 1);
375
376    // We evaluate the scalar polynomial of degree threshold - 1 at the point x
377    // using the Horner schema.
378
379    let y = (0..num_peers.threshold())
380        .map(|index| coefficient(index as u64))
381        .rev()
382        .reduce(|accumulator, c| accumulator * x + c)
383        .expect("We have at least one coefficient");
384
385    SecretKeyShare(y)
386}
387
388fn coefficient(index: u64) -> Scalar {
389    Scalar::random(&mut ChaChaRng::from_seed(
390        *index.consensus_hash::<sha256::Hash>().as_byte_array(),
391    ))
392}
393
394#[derive(Debug)]
395pub struct Lightning {
396    cfg: LightningConfig,
397    db: Database,
398    server_bitcoin_rpc_monitor: ServerBitcoinRpcMonitor,
399}
400
401#[apply(async_trait_maybe_send!)]
402impl ServerModule for Lightning {
403    type Common = LightningModuleTypes;
404    type Init = LightningInit;
405
406    async fn consensus_proposal(
407        &self,
408        _dbtx: &mut DatabaseTransaction<'_>,
409    ) -> Vec<LightningConsensusItem> {
410        // We reduce the time granularity to deduplicate votes more often and not save
411        // one consensus item every second.
412        let mut items = vec![LightningConsensusItem::UnixTimeVote(
413            60 * (duration_since_epoch().as_secs() / 60),
414        )];
415
416        if let Ok(block_count) = self.get_block_count() {
417            trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
418            items.push(LightningConsensusItem::BlockCountVote(block_count));
419        }
420
421        items
422    }
423
424    async fn process_consensus_item<'a, 'b>(
425        &'a self,
426        dbtx: &mut DatabaseTransaction<'b>,
427        consensus_item: LightningConsensusItem,
428        peer: PeerId,
429    ) -> anyhow::Result<()> {
430        trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
431
432        match consensus_item {
433            LightningConsensusItem::BlockCountVote(vote) => {
434                let current_vote = dbtx
435                    .insert_entry(&BlockCountVoteKey(peer), &vote)
436                    .await
437                    .unwrap_or(0);
438
439                ensure!(current_vote < vote, "Block count vote is redundant");
440
441                Ok(())
442            }
443            LightningConsensusItem::UnixTimeVote(vote) => {
444                let current_vote = dbtx
445                    .insert_entry(&UnixTimeVoteKey(peer), &vote)
446                    .await
447                    .unwrap_or(0);
448
449                ensure!(current_vote < vote, "Unix time vote is redundant");
450
451                Ok(())
452            }
453            LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
454                "Received lnv2 consensus item with unknown variant {variant}"
455            )),
456        }
457    }
458
459    async fn process_input<'a, 'b, 'c>(
460        &'a self,
461        dbtx: &mut DatabaseTransaction<'c>,
462        input: &'b LightningInput,
463        _in_point: InPoint,
464    ) -> Result<InputMeta, LightningInputError> {
465        let (pub_key, amount) = match input.ensure_v0_ref()? {
466            LightningInputV0::Outgoing(outpoint, outgoing_witness) => {
467                let contract = dbtx
468                    .remove_entry(&OutgoingContractKey(*outpoint))
469                    .await
470                    .ok_or(LightningInputError::UnknownContract)?;
471
472                let pub_key = match outgoing_witness {
473                    OutgoingWitness::Claim(preimage) => {
474                        if contract.expiration <= self.consensus_block_count(dbtx).await {
475                            return Err(LightningInputError::Expired);
476                        }
477
478                        if !contract.verify_preimage(preimage) {
479                            return Err(LightningInputError::InvalidPreimage);
480                        }
481
482                        dbtx.insert_entry(&PreimageKey(*outpoint), preimage).await;
483
484                        contract.claim_pk
485                    }
486                    OutgoingWitness::Refund => {
487                        if contract.expiration > self.consensus_block_count(dbtx).await {
488                            return Err(LightningInputError::NotExpired);
489                        }
490
491                        contract.refund_pk
492                    }
493                    OutgoingWitness::Cancel(forfeit_signature) => {
494                        if !contract.verify_forfeit_signature(forfeit_signature) {
495                            return Err(LightningInputError::InvalidForfeitSignature);
496                        }
497
498                        contract.refund_pk
499                    }
500                };
501
502                (pub_key, contract.amount)
503            }
504            LightningInputV0::Incoming(outpoint, agg_decryption_key) => {
505                let contract = dbtx
506                    .remove_entry(&IncomingContractKey(*outpoint))
507                    .await
508                    .ok_or(LightningInputError::UnknownContract)?;
509
510                let index = dbtx
511                    .remove_entry(&IncomingContractIndexKey(*outpoint))
512                    .await
513                    .expect("Incoming contract index should exist");
514
515                dbtx.remove_entry(&IncomingContractStreamKey(index)).await;
516
517                if !contract
518                    .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
519                {
520                    return Err(LightningInputError::InvalidDecryptionKey);
521                }
522
523                let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
524                    Some(..) => contract.commitment.claim_pk,
525                    None => contract.commitment.refund_pk,
526                };
527
528                (pub_key, contract.commitment.amount)
529            }
530        };
531
532        Ok(InputMeta {
533            amount: TransactionItemAmounts {
534                amounts: Amounts::new_bitcoin(amount),
535                fees: Amounts::new_bitcoin(self.cfg.consensus.fee_consensus.fee(amount)),
536            },
537            pub_key,
538        })
539    }
540
541    async fn process_output<'a, 'b>(
542        &'a self,
543        dbtx: &mut DatabaseTransaction<'b>,
544        output: &'a LightningOutput,
545        outpoint: OutPoint,
546    ) -> Result<TransactionItemAmounts, LightningOutputError> {
547        let amount = match output.ensure_v0_ref()? {
548            LightningOutputV0::Outgoing(contract) => {
549                dbtx.insert_new_entry(&OutgoingContractKey(outpoint), contract)
550                    .await;
551
552                contract.amount
553            }
554            LightningOutputV0::Incoming(contract) => {
555                if !contract.verify() {
556                    return Err(LightningOutputError::InvalidContract);
557                }
558
559                if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
560                    return Err(LightningOutputError::ContractExpired);
561                }
562
563                dbtx.insert_new_entry(&IncomingContractKey(outpoint), contract)
564                    .await;
565
566                dbtx.insert_entry(
567                    &IncomingContractOutpointKey(contract.contract_id()),
568                    &outpoint,
569                )
570                .await;
571
572                let stream_index = dbtx
573                    .get_value(&IncomingContractStreamIndexKey)
574                    .await
575                    .unwrap_or(0);
576
577                dbtx.insert_entry(&IncomingContractStreamKey(stream_index), contract)
578                    .await;
579
580                dbtx.insert_entry(&IncomingContractIndexKey(outpoint), &stream_index)
581                    .await;
582
583                dbtx.insert_entry(&IncomingContractStreamIndexKey, &(stream_index + 1))
584                    .await;
585
586                let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
587
588                dbtx.insert_entry(&DecryptionKeyShareKey(outpoint), &dk_share)
589                    .await;
590
591                contract.commitment.amount
592            }
593        };
594
595        Ok(TransactionItemAmounts {
596            amounts: Amounts::new_bitcoin(amount),
597            fees: Amounts::new_bitcoin(self.cfg.consensus.fee_consensus.fee(amount)),
598        })
599    }
600
601    async fn output_status(
602        &self,
603        _dbtx: &mut DatabaseTransaction<'_>,
604        _out_point: OutPoint,
605    ) -> Option<LightningOutputOutcome> {
606        None
607    }
608
609    async fn audit(
610        &self,
611        dbtx: &mut DatabaseTransaction<'_>,
612        audit: &mut Audit,
613        module_instance_id: ModuleInstanceId,
614    ) {
615        // Both incoming and outgoing contracts represent liabilities to the federation
616        // since they are obligations to issue notes.
617        audit
618            .add_items(
619                dbtx,
620                module_instance_id,
621                &OutgoingContractPrefix,
622                |_, contract| -(contract.amount.msats as i64),
623            )
624            .await;
625
626        audit
627            .add_items(
628                dbtx,
629                module_instance_id,
630                &IncomingContractPrefix,
631                |_, contract| -(contract.commitment.amount.msats as i64),
632            )
633            .await;
634    }
635
636    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
637        vec![
638            api_endpoint! {
639                CONSENSUS_BLOCK_COUNT_ENDPOINT,
640                ApiVersion::new(0, 0),
641                async |module: &Lightning, context, _params : () | -> u64 {
642                    let db = context.db();
643                    let mut dbtx = db.begin_transaction_nc().await;
644
645                    Ok(module.consensus_block_count(&mut dbtx).await)
646                }
647            },
648            api_endpoint! {
649                AWAIT_INCOMING_CONTRACT_ENDPOINT,
650                ApiVersion::new(0, 0),
651                async |module: &Lightning, context, params: (ContractId, u64) | -> Option<OutPoint> {
652                    let db = context.db();
653
654                    Ok(module.await_incoming_contract(db, params.0, params.1).await)
655                }
656            },
657            api_endpoint! {
658                AWAIT_PREIMAGE_ENDPOINT,
659                ApiVersion::new(0, 0),
660                async |module: &Lightning, context, params: (OutPoint, u64)| -> Option<[u8; 32]> {
661                    let db = context.db();
662
663                    Ok(module.await_preimage(db, params.0, params.1).await)
664                }
665            },
666            api_endpoint! {
667                DECRYPTION_KEY_SHARE_ENDPOINT,
668                ApiVersion::new(0, 0),
669                async |_module: &Lightning, context, params: OutPoint| -> DecryptionKeyShare {
670                    let share = context
671                        .db()
672                        .begin_transaction_nc()
673                        .await
674                        .get_value(&DecryptionKeyShareKey(params))
675                        .await
676                        .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
677
678                    Ok(share)
679                }
680            },
681            api_endpoint! {
682                OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
683                ApiVersion::new(0, 0),
684                async |module: &Lightning, context, outpoint: OutPoint| -> Option<(ContractId, u64)> {
685                    let db = context.db();
686
687                    Ok(module.outgoing_contract_expiration(db, outpoint).await)
688                }
689            },
690            api_endpoint! {
691                AWAIT_INCOMING_CONTRACTS_ENDPOINT,
692                ApiVersion::new(0, 0),
693                async |module: &Lightning, context, params: (u64, usize)| -> (Vec<IncomingContract>, u64) {
694                    let db = context.db();
695
696                    if params.1 == 0 {
697                        return Err(ApiError::bad_request("Batch size must be greater than 0".to_string()));
698                    }
699
700                    Ok(module.await_incoming_contracts(db, params.0, params.1).await)
701                }
702            },
703            api_endpoint! {
704                ADD_GATEWAY_ENDPOINT,
705                ApiVersion::new(0, 0),
706                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
707                    check_auth(context)?;
708
709                    let db = context.db();
710
711                    Ok(Lightning::add_gateway(db, gateway).await)
712                }
713            },
714            api_endpoint! {
715                REMOVE_GATEWAY_ENDPOINT,
716                ApiVersion::new(0, 0),
717                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
718                    check_auth(context)?;
719
720                    let db = context.db();
721
722                    Ok(Lightning::remove_gateway(db, gateway).await)
723                }
724            },
725            api_endpoint! {
726                GATEWAYS_ENDPOINT,
727                ApiVersion::new(0, 0),
728                async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
729                    let db = context.db();
730
731                    Ok(Lightning::gateways(db).await)
732                }
733            },
734        ]
735    }
736}
737
738impl Lightning {
739    fn get_block_count(&self) -> anyhow::Result<u64> {
740        self.server_bitcoin_rpc_monitor
741            .status()
742            .map(|status| status.block_count)
743            .context("Block count not available yet")
744    }
745
746    async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
747        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
748
749        let mut counts = dbtx
750            .find_by_prefix(&BlockCountVotePrefix)
751            .await
752            .map(|entry| entry.1)
753            .collect::<Vec<u64>>()
754            .await;
755
756        counts.sort_unstable();
757
758        counts.reverse();
759
760        assert!(counts.last() <= counts.first());
761
762        // The block count we select guarantees that any threshold of correct peers can
763        // increase the consensus block count and any consensus block count has been
764        // confirmed by a threshold of peers.
765
766        counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
767    }
768
769    async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
770        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
771
772        let mut times = dbtx
773            .find_by_prefix(&UnixTimeVotePrefix)
774            .await
775            .map(|entry| entry.1)
776            .collect::<Vec<u64>>()
777            .await;
778
779        times.sort_unstable();
780
781        times.reverse();
782
783        assert!(times.last() <= times.first());
784
785        // The unix time we select guarantees that any threshold of correct peers can
786        // advance the consensus unix time and any consensus unix time has been
787        // confirmed by a threshold of peers.
788
789        times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
790    }
791
792    async fn await_incoming_contract(
793        &self,
794        db: Database,
795        contract_id: ContractId,
796        expiration: u64,
797    ) -> Option<OutPoint> {
798        loop {
799            timeout(
800                Duration::from_secs(10),
801                db.wait_key_exists(&IncomingContractOutpointKey(contract_id)),
802            )
803            .await
804            .ok();
805
806            // to avoid race conditions we have to check for the contract and
807            // its expiration in the same database transaction
808            let mut dbtx = db.begin_transaction_nc().await;
809
810            if let Some(outpoint) = dbtx
811                .get_value(&IncomingContractOutpointKey(contract_id))
812                .await
813            {
814                return Some(outpoint);
815            }
816
817            if expiration <= self.consensus_unix_time(&mut dbtx).await {
818                return None;
819            }
820        }
821    }
822
823    async fn await_preimage(
824        &self,
825        db: Database,
826        outpoint: OutPoint,
827        expiration: u64,
828    ) -> Option<[u8; 32]> {
829        loop {
830            timeout(
831                Duration::from_secs(10),
832                db.wait_key_exists(&PreimageKey(outpoint)),
833            )
834            .await
835            .ok();
836
837            // to avoid race conditions we have to check for the preimage and
838            // the contracts expiration in the same database transaction
839            let mut dbtx = db.begin_transaction_nc().await;
840
841            if let Some(preimage) = dbtx.get_value(&PreimageKey(outpoint)).await {
842                return Some(preimage);
843            }
844
845            if expiration <= self.consensus_block_count(&mut dbtx).await {
846                return None;
847            }
848        }
849    }
850
851    async fn outgoing_contract_expiration(
852        &self,
853        db: Database,
854        outpoint: OutPoint,
855    ) -> Option<(ContractId, u64)> {
856        let mut dbtx = db.begin_transaction_nc().await;
857
858        let contract = dbtx.get_value(&OutgoingContractKey(outpoint)).await?;
859
860        let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
861
862        let expiration = contract.expiration.saturating_sub(consensus_block_count);
863
864        Some((contract.contract_id(), expiration))
865    }
866
867    async fn await_incoming_contracts(
868        &self,
869        db: Database,
870        start: u64,
871        n: usize,
872    ) -> (Vec<IncomingContract>, u64) {
873        let filter = |next_index: Option<u64>| next_index.filter(|i| *i > start);
874
875        let (mut next_index, mut dbtx) = db
876            .wait_key_check(&IncomingContractStreamIndexKey, filter)
877            .await;
878
879        let mut contracts = Vec::with_capacity(n);
880
881        let range = IncomingContractStreamKey(start)..IncomingContractStreamKey(u64::MAX);
882
883        for (key, contract) in dbtx
884            .find_by_range(range)
885            .await
886            .take(n)
887            .collect::<Vec<(IncomingContractStreamKey, IncomingContract)>>()
888            .await
889        {
890            contracts.push(contract.clone());
891            next_index = key.0 + 1;
892        }
893
894        (contracts, next_index)
895    }
896
897    async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
898        let mut dbtx = db.begin_transaction().await;
899
900        let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
901
902        dbtx.commit_tx().await;
903
904        is_new_entry
905    }
906
907    async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
908        let mut dbtx = db.begin_transaction().await;
909
910        let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
911
912        dbtx.commit_tx().await;
913
914        entry_existed
915    }
916
917    async fn gateways(db: Database) -> Vec<SafeUrl> {
918        db.begin_transaction_nc()
919            .await
920            .find_by_prefix(&GatewayPrefix)
921            .await
922            .map(|entry| entry.0.0)
923            .collect()
924            .await
925    }
926
927    pub async fn consensus_block_count_ui(&self) -> u64 {
928        self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
929            .await
930    }
931
932    pub async fn consensus_unix_time_ui(&self) -> u64 {
933        self.consensus_unix_time(&mut self.db.begin_transaction_nc().await)
934            .await
935    }
936
937    pub async fn add_gateway_ui(&self, gateway: SafeUrl) -> bool {
938        Self::add_gateway(self.db.clone(), gateway).await
939    }
940
941    pub async fn remove_gateway_ui(&self, gateway: SafeUrl) -> bool {
942        Self::remove_gateway(self.db.clone(), gateway).await
943    }
944
945    pub async fn gateways_ui(&self) -> Vec<SafeUrl> {
946        Self::gateways(self.db.clone()).await
947    }
948}