fedimint_lnv2_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_wrap)]
3#![allow(clippy::module_name_repetitions)]
4
5mod db;
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::time::Duration;
9
10use anyhow::{Context, anyhow, ensure};
11use bls12_381::{G1Projective, Scalar};
12use fedimint_core::bitcoin::hashes::sha256;
13use fedimint_core::config::{
14    ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
15    TypedServerModuleConsensusConfig,
16};
17use fedimint_core::core::ModuleInstanceId;
18use fedimint_core::db::{
19    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
20};
21use fedimint_core::encoding::Encodable;
22use fedimint_core::module::audit::Audit;
23use fedimint_core::module::{
24    Amounts, ApiEndpoint, ApiError, ApiVersion, CORE_CONSENSUS_VERSION, CoreConsensusVersion,
25    InputMeta, ModuleConsensusVersion, ModuleInit, SupportedModuleApiVersions,
26    TransactionItemAmounts, api_endpoint,
27};
28use fedimint_core::net::auth::check_auth;
29use fedimint_core::task::timeout;
30use fedimint_core::time::duration_since_epoch;
31use fedimint_core::util::SafeUrl;
32use fedimint_core::{
33    BitcoinHash, InPoint, NumPeers, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
34    push_db_pair_items,
35};
36use fedimint_lnv2_common::config::{
37    FeeConsensus, LightningClientConfig, LightningConfig, LightningConfigConsensus,
38    LightningConfigPrivate,
39};
40use fedimint_lnv2_common::contracts::{IncomingContract, OutgoingContract};
41use fedimint_lnv2_common::endpoint_constants::{
42    ADD_GATEWAY_ENDPOINT, AWAIT_INCOMING_CONTRACT_ENDPOINT, AWAIT_INCOMING_CONTRACTS_ENDPOINT,
43    AWAIT_PREIMAGE_ENDPOINT, CONSENSUS_BLOCK_COUNT_ENDPOINT, DECRYPTION_KEY_SHARE_ENDPOINT,
44    GATEWAYS_ENDPOINT, OUTGOING_CONTRACT_EXPIRATION_ENDPOINT, REMOVE_GATEWAY_ENDPOINT,
45};
46use fedimint_lnv2_common::{
47    ContractId, LightningCommonInit, LightningConsensusItem, LightningInput, LightningInputError,
48    LightningInputV0, LightningModuleTypes, LightningOutput, LightningOutputError,
49    LightningOutputOutcome, LightningOutputV0, MODULE_CONSENSUS_VERSION, OutgoingWitness,
50};
51use fedimint_logging::LOG_MODULE_LNV2;
52use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
53use fedimint_server_core::config::{PeerHandleOps, eval_poly_g1};
54use fedimint_server_core::migration::ServerModuleDbMigrationFn;
55use fedimint_server_core::{
56    ConfigGenModuleArgs, ServerModule, ServerModuleInit, ServerModuleInitArgs,
57};
58use futures::StreamExt;
59use group::Curve;
60use group::ff::Field;
61use rand::SeedableRng;
62use rand_chacha::ChaChaRng;
63use strum::IntoEnumIterator;
64use tpe::{
65    AggregatePublicKey, DecryptionKeyShare, PublicKeyShare, SecretKeyShare, derive_pk_share,
66};
67use tracing::trace;
68
69use crate::db::{
70    BlockCountVoteKey, BlockCountVotePrefix, DbKeyPrefix, DecryptionKeyShareKey,
71    DecryptionKeySharePrefix, GatewayKey, GatewayPrefix, IncomingContractIndexKey,
72    IncomingContractIndexPrefix, IncomingContractKey, IncomingContractOutpointKey,
73    IncomingContractOutpointPrefix, IncomingContractPrefix, IncomingContractStreamIndexKey,
74    IncomingContractStreamKey, IncomingContractStreamPrefix, OutgoingContractKey,
75    OutgoingContractPrefix, PreimageKey, PreimagePrefix, UnixTimeVoteKey, UnixTimeVotePrefix,
76};
77
78#[derive(Debug, Clone)]
79pub struct LightningInit;
80
81impl ModuleInit for LightningInit {
82    type Common = LightningCommonInit;
83
84    #[allow(clippy::too_many_lines)]
85    async fn dump_database(
86        &self,
87        dbtx: &mut DatabaseTransaction<'_>,
88        prefix_names: Vec<String>,
89    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
90        let mut lightning: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> =
91            BTreeMap::new();
92
93        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
94            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
95        });
96
97        for table in filtered_prefixes {
98            match table {
99                DbKeyPrefix::BlockCountVote => {
100                    push_db_pair_items!(
101                        dbtx,
102                        BlockCountVotePrefix,
103                        BlockCountVoteKey,
104                        u64,
105                        lightning,
106                        "Lightning Block Count Votes"
107                    );
108                }
109                DbKeyPrefix::UnixTimeVote => {
110                    push_db_pair_items!(
111                        dbtx,
112                        UnixTimeVotePrefix,
113                        UnixTimeVoteKey,
114                        u64,
115                        lightning,
116                        "Lightning Unix Time Votes"
117                    );
118                }
119                DbKeyPrefix::OutgoingContract => {
120                    push_db_pair_items!(
121                        dbtx,
122                        OutgoingContractPrefix,
123                        LightningOutgoingContractKey,
124                        OutgoingContract,
125                        lightning,
126                        "Lightning Outgoing Contracts"
127                    );
128                }
129                DbKeyPrefix::IncomingContract => {
130                    push_db_pair_items!(
131                        dbtx,
132                        IncomingContractPrefix,
133                        LightningIncomingContractKey,
134                        IncomingContract,
135                        lightning,
136                        "Lightning Incoming Contracts"
137                    );
138                }
139                DbKeyPrefix::IncomingContractOutpoint => {
140                    push_db_pair_items!(
141                        dbtx,
142                        IncomingContractOutpointPrefix,
143                        LightningIncomingContractOutpointKey,
144                        OutPoint,
145                        lightning,
146                        "Lightning Incoming Contracts Outpoints"
147                    );
148                }
149                DbKeyPrefix::DecryptionKeyShare => {
150                    push_db_pair_items!(
151                        dbtx,
152                        DecryptionKeySharePrefix,
153                        DecryptionKeyShareKey,
154                        DecryptionKeyShare,
155                        lightning,
156                        "Lightning Decryption Key Share"
157                    );
158                }
159                DbKeyPrefix::Preimage => {
160                    push_db_pair_items!(
161                        dbtx,
162                        PreimagePrefix,
163                        LightningPreimageKey,
164                        [u8; 32],
165                        lightning,
166                        "Lightning Preimages"
167                    );
168                }
169                DbKeyPrefix::Gateway => {
170                    push_db_pair_items!(
171                        dbtx,
172                        GatewayPrefix,
173                        GatewayKey,
174                        (),
175                        lightning,
176                        "Lightning Gateways"
177                    );
178                }
179                DbKeyPrefix::IncomingContractStreamIndex => {
180                    push_db_pair_items!(
181                        dbtx,
182                        IncomingContractStreamIndexKey,
183                        IncomingContractStreamIndexKey,
184                        u64,
185                        lightning,
186                        "Lightning Incoming Contract Stream Index"
187                    );
188                }
189                DbKeyPrefix::IncomingContractStream => {
190                    push_db_pair_items!(
191                        dbtx,
192                        IncomingContractStreamPrefix(0),
193                        IncomingContractStreamKey,
194                        IncomingContract,
195                        lightning,
196                        "Lightning Incoming Contract Stream"
197                    );
198                }
199                DbKeyPrefix::IncomingContractIndex => {
200                    push_db_pair_items!(
201                        dbtx,
202                        IncomingContractIndexPrefix,
203                        IncomingContractIndexKey,
204                        u64,
205                        lightning,
206                        "Lightning Incoming Contract Index"
207                    );
208                }
209            }
210        }
211
212        Box::new(lightning.into_iter())
213    }
214}
215
216#[apply(async_trait_maybe_send!)]
217impl ServerModuleInit for LightningInit {
218    type Module = Lightning;
219
220    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
221        &[MODULE_CONSENSUS_VERSION]
222    }
223
224    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
225        SupportedModuleApiVersions::from_raw(
226            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
227            (
228                MODULE_CONSENSUS_VERSION.major,
229                MODULE_CONSENSUS_VERSION.minor,
230            ),
231            &[(0, 0)],
232        )
233    }
234
235    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
236        Ok(Lightning {
237            cfg: args.cfg().to_typed()?,
238            db: args.db().clone(),
239            server_bitcoin_rpc_monitor: args.server_bitcoin_rpc_monitor(),
240        })
241    }
242
243    fn trusted_dealer_gen(
244        &self,
245        peers: &[PeerId],
246        args: &ConfigGenModuleArgs,
247    ) -> BTreeMap<PeerId, ServerModuleConfig> {
248        let tpe_pks = peers
249            .iter()
250            .map(|peer| (*peer, dealer_pk(peers.to_num_peers(), *peer)))
251            .collect::<BTreeMap<PeerId, PublicKeyShare>>();
252
253        peers
254            .iter()
255            .map(|peer| {
256                let cfg = LightningConfig {
257                    consensus: LightningConfigConsensus {
258                        tpe_agg_pk: dealer_agg_pk(),
259                        tpe_pks: tpe_pks.clone(),
260                        fee_consensus: if args.disable_base_fees {
261                            FeeConsensus::zero()
262                        } else {
263                            FeeConsensus::new(0).expect("Relative fee is within range")
264                        },
265                        network: args.network,
266                    },
267                    private: LightningConfigPrivate {
268                        sk: dealer_sk(peers.to_num_peers(), *peer),
269                    },
270                };
271
272                (*peer, cfg.to_erased())
273            })
274            .collect()
275    }
276
277    async fn distributed_gen(
278        &self,
279        peers: &(dyn PeerHandleOps + Send + Sync),
280        args: &ConfigGenModuleArgs,
281    ) -> anyhow::Result<ServerModuleConfig> {
282        let (polynomial, sks) = peers.run_dkg_g1().await?;
283
284        let server = LightningConfig {
285            consensus: LightningConfigConsensus {
286                tpe_agg_pk: tpe::AggregatePublicKey(polynomial[0].to_affine()),
287                tpe_pks: peers
288                    .num_peers()
289                    .peer_ids()
290                    .map(|peer| (peer, PublicKeyShare(eval_poly_g1(&polynomial, &peer))))
291                    .collect(),
292                fee_consensus: if args.disable_base_fees {
293                    FeeConsensus::zero()
294                } else {
295                    FeeConsensus::new(0).expect("Relative fee is within range")
296                },
297                network: args.network,
298            },
299            private: LightningConfigPrivate {
300                sk: SecretKeyShare(sks),
301            },
302        };
303
304        Ok(server.to_erased())
305    }
306
307    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
308        let config = config.to_typed::<LightningConfig>()?;
309
310        ensure!(
311            tpe::derive_pk_share(&config.private.sk)
312                == *config
313                    .consensus
314                    .tpe_pks
315                    .get(identity)
316                    .context("Public key set has no key for our identity")?,
317            "Preimge encryption secret key share does not match our public key share"
318        );
319
320        Ok(())
321    }
322
323    fn get_client_config(
324        &self,
325        config: &ServerModuleConsensusConfig,
326    ) -> anyhow::Result<LightningClientConfig> {
327        let config = LightningConfigConsensus::from_erased(config)?;
328        Ok(LightningClientConfig {
329            tpe_agg_pk: config.tpe_agg_pk,
330            tpe_pks: config.tpe_pks,
331            fee_consensus: config.fee_consensus,
332            network: config.network,
333        })
334    }
335
336    fn get_database_migrations(
337        &self,
338    ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Lightning>> {
339        let mut migrations: BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Lightning>> =
340            BTreeMap::new();
341
342        migrations.insert(
343            DatabaseVersion(0),
344            Box::new(move |ctx| Box::pin(crate::db::migrate_to_v1(ctx))),
345        );
346
347        migrations
348    }
349
350    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
351        Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
352    }
353}
354
355fn dealer_agg_pk() -> AggregatePublicKey {
356    AggregatePublicKey((G1Projective::generator() * coefficient(0)).to_affine())
357}
358
359fn dealer_pk(num_peers: NumPeers, peer: PeerId) -> PublicKeyShare {
360    derive_pk_share(&dealer_sk(num_peers, peer))
361}
362
363fn dealer_sk(num_peers: NumPeers, peer: PeerId) -> SecretKeyShare {
364    let x = Scalar::from(peer.to_usize() as u64 + 1);
365
366    // We evaluate the scalar polynomial of degree threshold - 1 at the point x
367    // using the Horner schema.
368
369    let y = (0..num_peers.threshold())
370        .map(|index| coefficient(index as u64))
371        .rev()
372        .reduce(|accumulator, c| accumulator * x + c)
373        .expect("We have at least one coefficient");
374
375    SecretKeyShare(y)
376}
377
378fn coefficient(index: u64) -> Scalar {
379    Scalar::random(&mut ChaChaRng::from_seed(
380        *index.consensus_hash::<sha256::Hash>().as_byte_array(),
381    ))
382}
383
384#[derive(Debug)]
385pub struct Lightning {
386    cfg: LightningConfig,
387    db: Database,
388    server_bitcoin_rpc_monitor: ServerBitcoinRpcMonitor,
389}
390
391#[apply(async_trait_maybe_send!)]
392impl ServerModule for Lightning {
393    type Common = LightningModuleTypes;
394    type Init = LightningInit;
395
396    async fn consensus_proposal(
397        &self,
398        _dbtx: &mut DatabaseTransaction<'_>,
399    ) -> Vec<LightningConsensusItem> {
400        // We reduce the time granularity to deduplicate votes more often and not save
401        // one consensus item every second.
402        let mut items = vec![LightningConsensusItem::UnixTimeVote(
403            60 * (duration_since_epoch().as_secs() / 60),
404        )];
405
406        if let Ok(block_count) = self.get_block_count() {
407            trace!(target: LOG_MODULE_LNV2, ?block_count, "Proposing block count");
408            items.push(LightningConsensusItem::BlockCountVote(block_count));
409        }
410
411        items
412    }
413
414    async fn process_consensus_item<'a, 'b>(
415        &'a self,
416        dbtx: &mut DatabaseTransaction<'b>,
417        consensus_item: LightningConsensusItem,
418        peer: PeerId,
419    ) -> anyhow::Result<()> {
420        trace!(target: LOG_MODULE_LNV2, ?consensus_item, "Processing consensus item proposal");
421
422        match consensus_item {
423            LightningConsensusItem::BlockCountVote(vote) => {
424                let current_vote = dbtx
425                    .insert_entry(&BlockCountVoteKey(peer), &vote)
426                    .await
427                    .unwrap_or(0);
428
429                ensure!(current_vote < vote, "Block count vote is redundant");
430
431                Ok(())
432            }
433            LightningConsensusItem::UnixTimeVote(vote) => {
434                let current_vote = dbtx
435                    .insert_entry(&UnixTimeVoteKey(peer), &vote)
436                    .await
437                    .unwrap_or(0);
438
439                ensure!(current_vote < vote, "Unix time vote is redundant");
440
441                Ok(())
442            }
443            LightningConsensusItem::Default { variant, .. } => Err(anyhow!(
444                "Received lnv2 consensus item with unknown variant {variant}"
445            )),
446        }
447    }
448
449    async fn process_input<'a, 'b, 'c>(
450        &'a self,
451        dbtx: &mut DatabaseTransaction<'c>,
452        input: &'b LightningInput,
453        _in_point: InPoint,
454    ) -> Result<InputMeta, LightningInputError> {
455        let (pub_key, amount) = match input.ensure_v0_ref()? {
456            LightningInputV0::Outgoing(outpoint, outgoing_witness) => {
457                let contract = dbtx
458                    .remove_entry(&OutgoingContractKey(*outpoint))
459                    .await
460                    .ok_or(LightningInputError::UnknownContract)?;
461
462                let pub_key = match outgoing_witness {
463                    OutgoingWitness::Claim(preimage) => {
464                        if contract.expiration <= self.consensus_block_count(dbtx).await {
465                            return Err(LightningInputError::Expired);
466                        }
467
468                        if !contract.verify_preimage(preimage) {
469                            return Err(LightningInputError::InvalidPreimage);
470                        }
471
472                        dbtx.insert_entry(&PreimageKey(*outpoint), preimage).await;
473
474                        contract.claim_pk
475                    }
476                    OutgoingWitness::Refund => {
477                        if contract.expiration > self.consensus_block_count(dbtx).await {
478                            return Err(LightningInputError::NotExpired);
479                        }
480
481                        contract.refund_pk
482                    }
483                    OutgoingWitness::Cancel(forfeit_signature) => {
484                        if !contract.verify_forfeit_signature(forfeit_signature) {
485                            return Err(LightningInputError::InvalidForfeitSignature);
486                        }
487
488                        contract.refund_pk
489                    }
490                };
491
492                (pub_key, contract.amount)
493            }
494            LightningInputV0::Incoming(outpoint, agg_decryption_key) => {
495                let contract = dbtx
496                    .remove_entry(&IncomingContractKey(*outpoint))
497                    .await
498                    .ok_or(LightningInputError::UnknownContract)?;
499
500                let index = dbtx
501                    .remove_entry(&IncomingContractIndexKey(*outpoint))
502                    .await
503                    .expect("Incoming contract index should exist");
504
505                dbtx.remove_entry(&IncomingContractStreamKey(index)).await;
506
507                if !contract
508                    .verify_agg_decryption_key(&self.cfg.consensus.tpe_agg_pk, agg_decryption_key)
509                {
510                    return Err(LightningInputError::InvalidDecryptionKey);
511                }
512
513                let pub_key = match contract.decrypt_preimage(agg_decryption_key) {
514                    Some(..) => contract.commitment.claim_pk,
515                    None => contract.commitment.refund_pk,
516                };
517
518                (pub_key, contract.commitment.amount)
519            }
520        };
521
522        Ok(InputMeta {
523            amount: TransactionItemAmounts {
524                amounts: Amounts::new_bitcoin(amount),
525                fees: Amounts::new_bitcoin(self.cfg.consensus.fee_consensus.fee(amount)),
526            },
527            pub_key,
528        })
529    }
530
531    async fn process_output<'a, 'b>(
532        &'a self,
533        dbtx: &mut DatabaseTransaction<'b>,
534        output: &'a LightningOutput,
535        outpoint: OutPoint,
536    ) -> Result<TransactionItemAmounts, LightningOutputError> {
537        let amount = match output.ensure_v0_ref()? {
538            LightningOutputV0::Outgoing(contract) => {
539                dbtx.insert_new_entry(&OutgoingContractKey(outpoint), contract)
540                    .await;
541
542                contract.amount
543            }
544            LightningOutputV0::Incoming(contract) => {
545                if !contract.verify() {
546                    return Err(LightningOutputError::InvalidContract);
547                }
548
549                if contract.commitment.expiration <= self.consensus_unix_time(dbtx).await {
550                    return Err(LightningOutputError::ContractExpired);
551                }
552
553                dbtx.insert_new_entry(&IncomingContractKey(outpoint), contract)
554                    .await;
555
556                dbtx.insert_entry(
557                    &IncomingContractOutpointKey(contract.contract_id()),
558                    &outpoint,
559                )
560                .await;
561
562                let stream_index = dbtx
563                    .get_value(&IncomingContractStreamIndexKey)
564                    .await
565                    .unwrap_or(0);
566
567                dbtx.insert_entry(&IncomingContractStreamKey(stream_index), contract)
568                    .await;
569
570                dbtx.insert_entry(&IncomingContractIndexKey(outpoint), &stream_index)
571                    .await;
572
573                dbtx.insert_entry(&IncomingContractStreamIndexKey, &(stream_index + 1))
574                    .await;
575
576                let dk_share = contract.create_decryption_key_share(&self.cfg.private.sk);
577
578                dbtx.insert_entry(&DecryptionKeyShareKey(outpoint), &dk_share)
579                    .await;
580
581                contract.commitment.amount
582            }
583        };
584
585        Ok(TransactionItemAmounts {
586            amounts: Amounts::new_bitcoin(amount),
587            fees: Amounts::new_bitcoin(self.cfg.consensus.fee_consensus.fee(amount)),
588        })
589    }
590
591    async fn output_status(
592        &self,
593        _dbtx: &mut DatabaseTransaction<'_>,
594        _out_point: OutPoint,
595    ) -> Option<LightningOutputOutcome> {
596        None
597    }
598
599    async fn audit(
600        &self,
601        dbtx: &mut DatabaseTransaction<'_>,
602        audit: &mut Audit,
603        module_instance_id: ModuleInstanceId,
604    ) {
605        // Both incoming and outgoing contracts represent liabilities to the federation
606        // since they are obligations to issue notes.
607        audit
608            .add_items(
609                dbtx,
610                module_instance_id,
611                &OutgoingContractPrefix,
612                |_, contract| -(contract.amount.msats as i64),
613            )
614            .await;
615
616        audit
617            .add_items(
618                dbtx,
619                module_instance_id,
620                &IncomingContractPrefix,
621                |_, contract| -(contract.commitment.amount.msats as i64),
622            )
623            .await;
624    }
625
626    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
627        vec![
628            api_endpoint! {
629                CONSENSUS_BLOCK_COUNT_ENDPOINT,
630                ApiVersion::new(0, 0),
631                async |module: &Lightning, context, _params : () | -> u64 {
632                    let db = context.db();
633                    let mut dbtx = db.begin_transaction_nc().await;
634
635                    Ok(module.consensus_block_count(&mut dbtx).await)
636                }
637            },
638            api_endpoint! {
639                AWAIT_INCOMING_CONTRACT_ENDPOINT,
640                ApiVersion::new(0, 0),
641                async |module: &Lightning, context, params: (ContractId, u64) | -> Option<OutPoint> {
642                    let db = context.db();
643
644                    Ok(module.await_incoming_contract(db, params.0, params.1).await)
645                }
646            },
647            api_endpoint! {
648                AWAIT_PREIMAGE_ENDPOINT,
649                ApiVersion::new(0, 0),
650                async |module: &Lightning, context, params: (OutPoint, u64)| -> Option<[u8; 32]> {
651                    let db = context.db();
652
653                    Ok(module.await_preimage(db, params.0, params.1).await)
654                }
655            },
656            api_endpoint! {
657                DECRYPTION_KEY_SHARE_ENDPOINT,
658                ApiVersion::new(0, 0),
659                async |_module: &Lightning, context, params: OutPoint| -> DecryptionKeyShare {
660                    let share = context
661                        .db()
662                        .begin_transaction_nc()
663                        .await
664                        .get_value(&DecryptionKeyShareKey(params))
665                        .await
666                        .ok_or(ApiError::bad_request("No decryption key share found".to_string()))?;
667
668                    Ok(share)
669                }
670            },
671            api_endpoint! {
672                OUTGOING_CONTRACT_EXPIRATION_ENDPOINT,
673                ApiVersion::new(0, 0),
674                async |module: &Lightning, context, outpoint: OutPoint| -> Option<(ContractId, u64)> {
675                    let db = context.db();
676
677                    Ok(module.outgoing_contract_expiration(db, outpoint).await)
678                }
679            },
680            api_endpoint! {
681                AWAIT_INCOMING_CONTRACTS_ENDPOINT,
682                ApiVersion::new(0, 0),
683                async |module: &Lightning, context, params: (u64, usize)| -> (Vec<IncomingContract>, u64) {
684                    let db = context.db();
685
686                    if params.1 == 0 {
687                        return Err(ApiError::bad_request("Batch size must be greater than 0".to_string()));
688                    }
689
690                    Ok(module.await_incoming_contracts(db, params.0, params.1).await)
691                }
692            },
693            api_endpoint! {
694                ADD_GATEWAY_ENDPOINT,
695                ApiVersion::new(0, 0),
696                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
697                    check_auth(context)?;
698
699                    let db = context.db();
700
701                    Ok(Lightning::add_gateway(db, gateway).await)
702                }
703            },
704            api_endpoint! {
705                REMOVE_GATEWAY_ENDPOINT,
706                ApiVersion::new(0, 0),
707                async |_module: &Lightning, context, gateway: SafeUrl| -> bool {
708                    check_auth(context)?;
709
710                    let db = context.db();
711
712                    Ok(Lightning::remove_gateway(db, gateway).await)
713                }
714            },
715            api_endpoint! {
716                GATEWAYS_ENDPOINT,
717                ApiVersion::new(0, 0),
718                async |_module: &Lightning, context, _params : () | -> Vec<SafeUrl> {
719                    let db = context.db();
720
721                    Ok(Lightning::gateways(db).await)
722                }
723            },
724        ]
725    }
726}
727
728impl Lightning {
729    fn get_block_count(&self) -> anyhow::Result<u64> {
730        self.server_bitcoin_rpc_monitor
731            .status()
732            .map(|status| status.block_count)
733            .context("Block count not available yet")
734    }
735
736    async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
737        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
738
739        let mut counts = dbtx
740            .find_by_prefix(&BlockCountVotePrefix)
741            .await
742            .map(|entry| entry.1)
743            .collect::<Vec<u64>>()
744            .await;
745
746        counts.sort_unstable();
747
748        counts.reverse();
749
750        assert!(counts.last() <= counts.first());
751
752        // The block count we select guarantees that any threshold of correct peers can
753        // increase the consensus block count and any consensus block count has been
754        // confirmed by a threshold of peers.
755
756        counts.get(num_peers.threshold() - 1).copied().unwrap_or(0)
757    }
758
759    async fn consensus_unix_time(&self, dbtx: &mut DatabaseTransaction<'_>) -> u64 {
760        let num_peers = self.cfg.consensus.tpe_pks.to_num_peers();
761
762        let mut times = dbtx
763            .find_by_prefix(&UnixTimeVotePrefix)
764            .await
765            .map(|entry| entry.1)
766            .collect::<Vec<u64>>()
767            .await;
768
769        times.sort_unstable();
770
771        times.reverse();
772
773        assert!(times.last() <= times.first());
774
775        // The unix time we select guarantees that any threshold of correct peers can
776        // advance the consensus unix time and any consensus unix time has been
777        // confirmed by a threshold of peers.
778
779        times.get(num_peers.threshold() - 1).copied().unwrap_or(0)
780    }
781
782    async fn await_incoming_contract(
783        &self,
784        db: Database,
785        contract_id: ContractId,
786        expiration: u64,
787    ) -> Option<OutPoint> {
788        loop {
789            timeout(
790                Duration::from_secs(10),
791                db.wait_key_exists(&IncomingContractOutpointKey(contract_id)),
792            )
793            .await
794            .ok();
795
796            // to avoid race conditions we have to check for the contract and
797            // its expiration in the same database transaction
798            let mut dbtx = db.begin_transaction_nc().await;
799
800            if let Some(outpoint) = dbtx
801                .get_value(&IncomingContractOutpointKey(contract_id))
802                .await
803            {
804                return Some(outpoint);
805            }
806
807            if expiration <= self.consensus_unix_time(&mut dbtx).await {
808                return None;
809            }
810        }
811    }
812
813    async fn await_preimage(
814        &self,
815        db: Database,
816        outpoint: OutPoint,
817        expiration: u64,
818    ) -> Option<[u8; 32]> {
819        loop {
820            timeout(
821                Duration::from_secs(10),
822                db.wait_key_exists(&PreimageKey(outpoint)),
823            )
824            .await
825            .ok();
826
827            // to avoid race conditions we have to check for the preimage and
828            // the contracts expiration in the same database transaction
829            let mut dbtx = db.begin_transaction_nc().await;
830
831            if let Some(preimage) = dbtx.get_value(&PreimageKey(outpoint)).await {
832                return Some(preimage);
833            }
834
835            if expiration <= self.consensus_block_count(&mut dbtx).await {
836                return None;
837            }
838        }
839    }
840
841    async fn outgoing_contract_expiration(
842        &self,
843        db: Database,
844        outpoint: OutPoint,
845    ) -> Option<(ContractId, u64)> {
846        let mut dbtx = db.begin_transaction_nc().await;
847
848        let contract = dbtx.get_value(&OutgoingContractKey(outpoint)).await?;
849
850        let consensus_block_count = self.consensus_block_count(&mut dbtx).await;
851
852        let expiration = contract.expiration.saturating_sub(consensus_block_count);
853
854        Some((contract.contract_id(), expiration))
855    }
856
857    async fn await_incoming_contracts(
858        &self,
859        db: Database,
860        start: u64,
861        n: usize,
862    ) -> (Vec<IncomingContract>, u64) {
863        let filter = |next_index: Option<u64>| next_index.filter(|i| *i > start);
864
865        let (mut next_index, mut dbtx) = db
866            .wait_key_check(&IncomingContractStreamIndexKey, filter)
867            .await;
868
869        let mut contracts = Vec::with_capacity(n);
870
871        let range = IncomingContractStreamKey(start)..IncomingContractStreamKey(u64::MAX);
872
873        for (key, contract) in dbtx
874            .find_by_range(range)
875            .await
876            .take(n)
877            .collect::<Vec<(IncomingContractStreamKey, IncomingContract)>>()
878            .await
879        {
880            contracts.push(contract.clone());
881            next_index = key.0 + 1;
882        }
883
884        (contracts, next_index)
885    }
886
887    async fn add_gateway(db: Database, gateway: SafeUrl) -> bool {
888        let mut dbtx = db.begin_transaction().await;
889
890        let is_new_entry = dbtx.insert_entry(&GatewayKey(gateway), &()).await.is_none();
891
892        dbtx.commit_tx().await;
893
894        is_new_entry
895    }
896
897    async fn remove_gateway(db: Database, gateway: SafeUrl) -> bool {
898        let mut dbtx = db.begin_transaction().await;
899
900        let entry_existed = dbtx.remove_entry(&GatewayKey(gateway)).await.is_some();
901
902        dbtx.commit_tx().await;
903
904        entry_existed
905    }
906
907    async fn gateways(db: Database) -> Vec<SafeUrl> {
908        db.begin_transaction_nc()
909            .await
910            .find_by_prefix(&GatewayPrefix)
911            .await
912            .map(|entry| entry.0.0)
913            .collect()
914            .await
915    }
916
917    pub async fn consensus_block_count_ui(&self) -> u64 {
918        self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
919            .await
920    }
921
922    pub async fn consensus_unix_time_ui(&self) -> u64 {
923        self.consensus_unix_time(&mut self.db.begin_transaction_nc().await)
924            .await
925    }
926
927    pub async fn add_gateway_ui(&self, gateway: SafeUrl) -> bool {
928        Self::add_gateway(self.db.clone(), gateway).await
929    }
930
931    pub async fn remove_gateway_ui(&self, gateway: SafeUrl) -> bool {
932        Self::remove_gateway(self.db.clone(), gateway).await
933    }
934
935    pub async fn gateways_ui(&self) -> Vec<SafeUrl> {
936        Self::gateways(self.db.clone()).await
937    }
938}