fedimint_wallet_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_possible_wrap)]
4#![allow(clippy::default_trait_access)]
5#![allow(clippy::missing_errors_doc)]
6#![allow(clippy::missing_panics_doc)]
7#![allow(clippy::module_name_repetitions)]
8#![allow(clippy::must_use_candidate)]
9#![allow(clippy::needless_lifetimes)]
10#![allow(clippy::too_many_lines)]
11
12pub mod db;
13pub mod envs;
14
15use std::clone::Clone;
16use std::cmp::min;
17use std::collections::{BTreeMap, BTreeSet, HashMap};
18use std::convert::Infallible;
19use std::sync::Arc;
20#[cfg(not(target_family = "wasm"))]
21use std::time::Duration;
22
23use anyhow::{Context, bail, ensure, format_err};
24use bitcoin::absolute::LockTime;
25use bitcoin::address::NetworkUnchecked;
26use bitcoin::ecdsa::Signature as EcdsaSig;
27use bitcoin::hashes::{Hash as BitcoinHash, HashEngine, Hmac, HmacEngine, sha256};
28use bitcoin::policy::DEFAULT_MIN_RELAY_TX_FEE;
29use bitcoin::psbt::{Input, Psbt};
30use bitcoin::secp256k1::{self, All, Message, Scalar, Secp256k1, Verification};
31use bitcoin::sighash::{EcdsaSighashType, SighashCache};
32use bitcoin::{Address, BlockHash, Network, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Txid};
33use common::config::WalletConfigConsensus;
34use common::{
35    DEPRECATED_RBF_ERROR, PegOutFees, PegOutSignatureItem, ProcessPegOutSigError, SpendableUTXO,
36    TxOutputSummary, WalletCommonInit, WalletConsensusItem, WalletInput, WalletModuleTypes,
37    WalletOutput, WalletOutputOutcome, WalletSummary, proprietary_tweak_key,
38};
39use db::{
40    BlockHashByHeightKey, BlockHashByHeightKeyPrefix, BlockHashByHeightValue, RecoveryItemKey,
41    RecoveryItemKeyPrefix,
42};
43use envs::get_feerate_multiplier;
44use fedimint_api_client::api::{DynModuleApi, FederationApiExt};
45use fedimint_core::config::{
46    ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
47    TypedServerModuleConsensusConfig,
48};
49use fedimint_core::core::ModuleInstanceId;
50use fedimint_core::db::{
51    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
52};
53use fedimint_core::encoding::btc::NetworkLegacyEncodingWrapper;
54use fedimint_core::encoding::{Decodable, Encodable};
55use fedimint_core::envs::{BitcoinRpcConfig, is_rbf_withdrawal_enabled, is_running_in_test_env};
56use fedimint_core::module::audit::Audit;
57use fedimint_core::module::{
58    Amounts, ApiEndpoint, ApiError, ApiRequestErased, ApiVersion, CORE_CONSENSUS_VERSION,
59    CoreConsensusVersion, InputMeta, ModuleConsensusVersion, ModuleInit,
60    SupportedModuleApiVersions, TransactionItemAmounts, api_endpoint,
61};
62use fedimint_core::net::auth::check_auth;
63use fedimint_core::task::TaskGroup;
64#[cfg(not(target_family = "wasm"))]
65use fedimint_core::task::sleep;
66use fedimint_core::util::{FmtCompact, FmtCompactAnyhow as _, backoff_util, retry};
67use fedimint_core::{
68    Feerate, InPoint, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
69    get_network_for_address, push_db_key_items, push_db_pair_items,
70};
71use fedimint_logging::LOG_MODULE_WALLET;
72use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
73use fedimint_server_core::config::{PeerHandleOps, PeerHandleOpsExt};
74use fedimint_server_core::migration::ServerModuleDbMigrationFn;
75use fedimint_server_core::{
76    ConfigGenModuleArgs, ServerModule, ServerModuleInit, ServerModuleInitArgs,
77};
78pub use fedimint_wallet_common as common;
79use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig, WalletConfig};
80use fedimint_wallet_common::endpoint_constants::{
81    ACTIVATE_CONSENSUS_VERSION_VOTING_ENDPOINT, BITCOIN_KIND_ENDPOINT, BITCOIN_RPC_CONFIG_ENDPOINT,
82    BLOCK_COUNT_ENDPOINT, BLOCK_COUNT_LOCAL_ENDPOINT, MODULE_CONSENSUS_VERSION_ENDPOINT,
83    PEG_OUT_FEES_ENDPOINT, RECOVERY_COUNT_ENDPOINT, RECOVERY_SLICE_ENDPOINT,
84    SUPPORTED_MODULE_CONSENSUS_VERSION_ENDPOINT, UTXO_CONFIRMED_ENDPOINT, WALLET_SUMMARY_ENDPOINT,
85};
86use fedimint_wallet_common::envs::FM_PORT_ESPLORA_ENV;
87use fedimint_wallet_common::keys::CompressedPublicKey;
88use fedimint_wallet_common::tweakable::Tweakable;
89use fedimint_wallet_common::{
90    MODULE_CONSENSUS_VERSION, Rbf, RecoveryItem, UnknownWalletInputVariantError, WalletInputError,
91    WalletOutputError, WalletOutputV0,
92};
93use futures::future::join_all;
94use futures::{FutureExt, StreamExt};
95use itertools::Itertools;
96use metrics::{
97    WALLET_INOUT_FEES_SATS, WALLET_INOUT_SATS, WALLET_PEGIN_FEES_SATS, WALLET_PEGIN_SATS,
98    WALLET_PEGOUT_FEES_SATS, WALLET_PEGOUT_SATS,
99};
100use miniscript::psbt::PsbtExt;
101use miniscript::{Descriptor, TranslatePk, translate_hash_fail};
102use rand::rngs::OsRng;
103use serde::Serialize;
104use strum::IntoEnumIterator;
105use tokio::sync::{Notify, watch};
106use tracing::{debug, info, instrument, trace, warn};
107
108use crate::db::{
109    BlockCountVoteKey, BlockCountVotePrefix, BlockHashKey, BlockHashKeyPrefix,
110    ClaimedPegInOutpointKey, ClaimedPegInOutpointPrefixKey, ConsensusVersionVoteKey,
111    ConsensusVersionVotePrefix, ConsensusVersionVotingActivationKey,
112    ConsensusVersionVotingActivationPrefix, DbKeyPrefix, FeeRateVoteKey, FeeRateVotePrefix,
113    PegOutBitcoinTransaction, PegOutBitcoinTransactionPrefix, PegOutNonceKey, PegOutTxSignatureCI,
114    PegOutTxSignatureCIPrefix, PendingTransactionKey, PendingTransactionPrefixKey, UTXOKey,
115    UTXOPrefixKey, UnsignedTransactionKey, UnsignedTransactionPrefixKey, UnspentTxOutKey,
116    UnspentTxOutPrefix, migrate_to_v1, migrate_to_v2,
117};
118use crate::metrics::WALLET_BLOCK_COUNT;
119
120mod metrics;
121
122#[derive(Debug, Clone)]
123pub struct WalletInit;
124
125impl ModuleInit for WalletInit {
126    type Common = WalletCommonInit;
127
128    async fn dump_database(
129        &self,
130        dbtx: &mut DatabaseTransaction<'_>,
131        prefix_names: Vec<String>,
132    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
133        let mut wallet: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
134        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
135            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
136        });
137        for table in filtered_prefixes {
138            match table {
139                DbKeyPrefix::BlockHash => {
140                    push_db_key_items!(dbtx, BlockHashKeyPrefix, BlockHashKey, wallet, "Blocks");
141                }
142                DbKeyPrefix::BlockHashByHeight => {
143                    push_db_key_items!(
144                        dbtx,
145                        BlockHashByHeightKeyPrefix,
146                        BlockHashByHeightKey,
147                        wallet,
148                        "Blocks by height"
149                    );
150                }
151                DbKeyPrefix::PegOutBitcoinOutPoint => {
152                    push_db_pair_items!(
153                        dbtx,
154                        PegOutBitcoinTransactionPrefix,
155                        PegOutBitcoinTransaction,
156                        WalletOutputOutcome,
157                        wallet,
158                        "Peg Out Bitcoin Transaction"
159                    );
160                }
161                DbKeyPrefix::PegOutTxSigCi => {
162                    push_db_pair_items!(
163                        dbtx,
164                        PegOutTxSignatureCIPrefix,
165                        PegOutTxSignatureCI,
166                        Vec<secp256k1::ecdsa::Signature>,
167                        wallet,
168                        "Peg Out Transaction Signatures"
169                    );
170                }
171                DbKeyPrefix::PendingTransaction => {
172                    push_db_pair_items!(
173                        dbtx,
174                        PendingTransactionPrefixKey,
175                        PendingTransactionKey,
176                        PendingTransaction,
177                        wallet,
178                        "Pending Transactions"
179                    );
180                }
181                DbKeyPrefix::PegOutNonce => {
182                    if let Some(nonce) = dbtx.get_value(&PegOutNonceKey).await {
183                        wallet.insert("Peg Out Nonce".to_string(), Box::new(nonce));
184                    }
185                }
186                DbKeyPrefix::UnsignedTransaction => {
187                    push_db_pair_items!(
188                        dbtx,
189                        UnsignedTransactionPrefixKey,
190                        UnsignedTransactionKey,
191                        UnsignedTransaction,
192                        wallet,
193                        "Unsigned Transactions"
194                    );
195                }
196                DbKeyPrefix::Utxo => {
197                    push_db_pair_items!(
198                        dbtx,
199                        UTXOPrefixKey,
200                        UTXOKey,
201                        SpendableUTXO,
202                        wallet,
203                        "UTXOs"
204                    );
205                }
206                DbKeyPrefix::BlockCountVote => {
207                    push_db_pair_items!(
208                        dbtx,
209                        BlockCountVotePrefix,
210                        BlockCountVoteKey,
211                        u32,
212                        wallet,
213                        "Block Count Votes"
214                    );
215                }
216                DbKeyPrefix::FeeRateVote => {
217                    push_db_pair_items!(
218                        dbtx,
219                        FeeRateVotePrefix,
220                        FeeRateVoteKey,
221                        Feerate,
222                        wallet,
223                        "Fee Rate Votes"
224                    );
225                }
226                DbKeyPrefix::ClaimedPegInOutpoint => {
227                    push_db_pair_items!(
228                        dbtx,
229                        ClaimedPegInOutpointPrefixKey,
230                        PeggedInOutpointKey,
231                        (),
232                        wallet,
233                        "Claimed Peg-in Outpoint"
234                    );
235                }
236                DbKeyPrefix::ConsensusVersionVote => {
237                    push_db_pair_items!(
238                        dbtx,
239                        ConsensusVersionVotePrefix,
240                        ConsensusVersionVoteKey,
241                        ModuleConsensusVersion,
242                        wallet,
243                        "Consensus Version Votes"
244                    );
245                }
246                DbKeyPrefix::UnspentTxOut => {
247                    push_db_pair_items!(
248                        dbtx,
249                        UnspentTxOutPrefix,
250                        UnspentTxOutKey,
251                        TxOut,
252                        wallet,
253                        "Consensus Version Votes"
254                    );
255                }
256                DbKeyPrefix::ConsensusVersionVotingActivation => {
257                    push_db_pair_items!(
258                        dbtx,
259                        ConsensusVersionVotingActivationPrefix,
260                        ConsensusVersionVotingActivationKey,
261                        (),
262                        wallet,
263                        "Consensus Version Voting Activation Key"
264                    );
265                }
266                DbKeyPrefix::RecoveryItem => {
267                    push_db_pair_items!(
268                        dbtx,
269                        RecoveryItemKeyPrefix,
270                        RecoveryItemKey,
271                        RecoveryItem,
272                        wallet,
273                        "Recovery Items"
274                    );
275                }
276            }
277        }
278
279        Box::new(wallet.into_iter())
280    }
281}
282
283/// Default finality delay based on network
284fn default_finality_delay(network: Network) -> u32 {
285    match network {
286        Network::Bitcoin | Network::Regtest => 10,
287        Network::Testnet | Network::Signet | Network::Testnet4 => 2,
288        _ => panic!("Unsupported network"),
289    }
290}
291
292/// Default Bitcoin RPC config for clients
293fn default_client_bitcoin_rpc(network: Network) -> BitcoinRpcConfig {
294    let url = match network {
295        Network::Bitcoin => "https://mempool.space/api/".to_string(),
296        Network::Testnet => "https://mempool.space/testnet/api/".to_string(),
297        Network::Testnet4 => "https://mempool.space/testnet4/api/".to_string(),
298        Network::Signet => "https://mutinynet.com/api/".to_string(),
299        Network::Regtest => format!(
300            "http://127.0.0.1:{}/",
301            std::env::var(FM_PORT_ESPLORA_ENV).unwrap_or_else(|_| String::from("50002"))
302        ),
303        _ => panic!("Unsupported network"),
304    };
305
306    BitcoinRpcConfig {
307        kind: "esplora".to_string(),
308        url: fedimint_core::util::SafeUrl::parse(&url).expect("hardcoded URL is valid"),
309    }
310}
311
312#[apply(async_trait_maybe_send!)]
313impl ServerModuleInit for WalletInit {
314    type Module = Wallet;
315
316    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
317        &[MODULE_CONSENSUS_VERSION]
318    }
319
320    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
321        SupportedModuleApiVersions::from_raw(
322            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
323            (
324                MODULE_CONSENSUS_VERSION.major,
325                MODULE_CONSENSUS_VERSION.minor,
326            ),
327            &[(0, 2)],
328        )
329    }
330
331    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
332        for direction in ["incoming", "outgoing"] {
333            WALLET_INOUT_FEES_SATS
334                .with_label_values(&[direction])
335                .get_sample_count();
336            WALLET_INOUT_SATS
337                .with_label_values(&[direction])
338                .get_sample_count();
339        }
340        // Eagerly initialize metrics that trigger infrequently
341        WALLET_PEGIN_FEES_SATS.get_sample_count();
342        WALLET_PEGIN_SATS.get_sample_count();
343        WALLET_PEGOUT_SATS.get_sample_count();
344        WALLET_PEGOUT_FEES_SATS.get_sample_count();
345
346        Ok(Wallet::new(
347            args.cfg().to_typed()?,
348            args.db(),
349            args.task_group(),
350            args.our_peer_id(),
351            args.module_api().clone(),
352            args.server_bitcoin_rpc_monitor(),
353        )
354        .await?)
355    }
356
357    fn trusted_dealer_gen(
358        &self,
359        peers: &[PeerId],
360        args: &ConfigGenModuleArgs,
361    ) -> BTreeMap<PeerId, ServerModuleConfig> {
362        let secp = bitcoin::secp256k1::Secp256k1::new();
363        let finality_delay = default_finality_delay(args.network);
364        let client_default_bitcoin_rpc = default_client_bitcoin_rpc(args.network);
365
366        let btc_pegin_keys = peers
367            .iter()
368            .map(|&id| (id, secp.generate_keypair(&mut OsRng)))
369            .collect::<Vec<_>>();
370
371        let wallet_cfg: BTreeMap<PeerId, WalletConfig> = btc_pegin_keys
372            .iter()
373            .map(|(id, (sk, _))| {
374                let cfg = WalletConfig::new(
375                    btc_pegin_keys
376                        .iter()
377                        .map(|(peer_id, (_, pk))| (*peer_id, CompressedPublicKey { key: *pk }))
378                        .collect(),
379                    *sk,
380                    peers.to_num_peers().threshold(),
381                    args.network,
382                    finality_delay,
383                    client_default_bitcoin_rpc.clone(),
384                    FeeConsensus::default(),
385                );
386                (*id, cfg)
387            })
388            .collect();
389
390        wallet_cfg
391            .into_iter()
392            .map(|(k, v)| (k, v.to_erased()))
393            .collect()
394    }
395
396    async fn distributed_gen(
397        &self,
398        peers: &(dyn PeerHandleOps + Send + Sync),
399        args: &ConfigGenModuleArgs,
400    ) -> anyhow::Result<ServerModuleConfig> {
401        let secp = secp256k1::Secp256k1::new();
402        let (sk, pk) = secp.generate_keypair(&mut OsRng);
403        let our_key = CompressedPublicKey { key: pk };
404        let peer_peg_in_keys: BTreeMap<PeerId, CompressedPublicKey> = peers
405            .exchange_encodable(our_key.key)
406            .await?
407            .into_iter()
408            .map(|(k, key)| (k, CompressedPublicKey { key }))
409            .collect();
410
411        let finality_delay = default_finality_delay(args.network);
412        let client_default_bitcoin_rpc = default_client_bitcoin_rpc(args.network);
413
414        let wallet_cfg = WalletConfig::new(
415            peer_peg_in_keys,
416            sk,
417            peers.num_peers().threshold(),
418            args.network,
419            finality_delay,
420            client_default_bitcoin_rpc,
421            FeeConsensus::default(),
422        );
423
424        Ok(wallet_cfg.to_erased())
425    }
426
427    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
428        let config = config.to_typed::<WalletConfig>()?;
429        let pubkey = secp256k1::PublicKey::from_secret_key_global(&config.private.peg_in_key);
430
431        if config
432            .consensus
433            .peer_peg_in_keys
434            .get(identity)
435            .ok_or_else(|| format_err!("Secret key doesn't match any public key"))?
436            != &CompressedPublicKey::new(pubkey)
437        {
438            bail!(" Bitcoin wallet private key doesn't match multisig pubkey");
439        }
440
441        Ok(())
442    }
443
444    fn get_client_config(
445        &self,
446        config: &ServerModuleConsensusConfig,
447    ) -> anyhow::Result<WalletClientConfig> {
448        let config = WalletConfigConsensus::from_erased(config)?;
449        Ok(WalletClientConfig {
450            peg_in_descriptor: config.peg_in_descriptor,
451            network: config.network,
452            fee_consensus: config.fee_consensus,
453            finality_delay: config.finality_delay,
454            default_bitcoin_rpc: config.client_default_bitcoin_rpc,
455        })
456    }
457
458    /// DB migrations to move from old to newer versions
459    fn get_database_migrations(
460        &self,
461    ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Wallet>> {
462        let mut migrations: BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Wallet>> =
463            BTreeMap::new();
464        migrations.insert(
465            DatabaseVersion(0),
466            Box::new(|ctx| migrate_to_v1(ctx).boxed()),
467        );
468        migrations.insert(
469            DatabaseVersion(1),
470            Box::new(|ctx| migrate_to_v2(ctx).boxed()),
471        );
472        migrations
473    }
474
475    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
476        Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
477    }
478}
479
480#[apply(async_trait_maybe_send!)]
481impl ServerModule for Wallet {
482    type Common = WalletModuleTypes;
483    type Init = WalletInit;
484
485    async fn consensus_proposal<'a>(
486        &'a self,
487        dbtx: &mut DatabaseTransaction<'_>,
488    ) -> Vec<WalletConsensusItem> {
489        let mut items = dbtx
490            .find_by_prefix(&PegOutTxSignatureCIPrefix)
491            .await
492            .map(|(key, val)| {
493                WalletConsensusItem::PegOutSignature(PegOutSignatureItem {
494                    txid: key.0,
495                    signature: val,
496                })
497            })
498            .collect::<Vec<WalletConsensusItem>>()
499            .await;
500
501        // If we are unable to get a block count from the node we skip adding a block
502        // count vote to consensus items.
503        //
504        // The potential impact of not including the latest block count from our peer's
505        // node is delayed processing of change outputs for the federation, which is an
506        // acceptable risk since subsequent rounds of consensus will reattempt to fetch
507        // the latest block count.
508        match self.get_block_count() {
509            Ok(block_count) => {
510                let mut block_count_vote =
511                    block_count.saturating_sub(self.cfg.consensus.finality_delay);
512
513                let current_consensus_block_count = self.consensus_block_count(dbtx).await;
514
515                // This will prevent that more then five blocks are synced in a single database
516                // transaction if the federation was offline for a prolonged period of time.
517                if current_consensus_block_count != 0 {
518                    block_count_vote = min(
519                        block_count_vote,
520                        current_consensus_block_count
521                            + if is_running_in_test_env() {
522                                // We have some tests that mine *a lot* of blocks (empty)
523                                // and need them processed fast, so we raise the max.
524                                100
525                            } else {
526                                5
527                            },
528                    );
529                }
530
531                let current_vote = dbtx
532                    .get_value(&BlockCountVoteKey(self.our_peer_id))
533                    .await
534                    .unwrap_or(0);
535
536                trace!(
537                    target: LOG_MODULE_WALLET,
538                    ?current_vote,
539                    ?block_count_vote,
540                    ?block_count,
541                    ?current_consensus_block_count,
542                    "Proposing block count"
543                );
544
545                WALLET_BLOCK_COUNT.set(i64::from(block_count_vote));
546                items.push(WalletConsensusItem::BlockCount(block_count_vote));
547            }
548            Err(err) => {
549                warn!(target: LOG_MODULE_WALLET, err = %err.fmt_compact_anyhow(), "Can't update block count");
550            }
551        }
552
553        let fee_rate_proposal = self.get_fee_rate_opt();
554
555        items.push(WalletConsensusItem::Feerate(fee_rate_proposal));
556
557        // Consensus upgrade activation voting
558        let manual_vote = dbtx
559            .get_value(&ConsensusVersionVotingActivationKey)
560            .await
561            .map(|()| {
562                // TODO: allow voting on any version between the currently active and max
563                // supported one in case we support a too high one already
564                MODULE_CONSENSUS_VERSION
565            });
566
567        let active_consensus_version = self.consensus_module_consensus_version(dbtx).await;
568        let automatic_vote = self.peer_supported_consensus_version.borrow().and_then(
569            |supported_consensus_version| {
570                // Only automatically vote if the commonly supported version is higher than the
571                // currently active one
572                (active_consensus_version < supported_consensus_version)
573                    .then_some(supported_consensus_version)
574            },
575        );
576
577        // Prioritizing automatic vote for now since the manual vote never resets. Once
578        // that is fixed this should be switched around.
579        if let Some(vote_version) = automatic_vote.or(manual_vote) {
580            items.push(WalletConsensusItem::ModuleConsensusVersion(vote_version));
581        }
582
583        items
584    }
585
586    async fn process_consensus_item<'a, 'b>(
587        &'a self,
588        dbtx: &mut DatabaseTransaction<'b>,
589        consensus_item: WalletConsensusItem,
590        peer: PeerId,
591    ) -> anyhow::Result<()> {
592        trace!(target: LOG_MODULE_WALLET, ?consensus_item, "Processing consensus item proposal");
593
594        match consensus_item {
595            WalletConsensusItem::BlockCount(block_count_vote) => {
596                let current_vote = dbtx.get_value(&BlockCountVoteKey(peer)).await.unwrap_or(0);
597
598                if block_count_vote < current_vote {
599                    warn!(target: LOG_MODULE_WALLET, ?peer, ?block_count_vote, "Block count vote is outdated");
600                }
601
602                ensure!(
603                    block_count_vote > current_vote,
604                    "Block count vote is redundant"
605                );
606
607                let old_consensus_block_count = self.consensus_block_count(dbtx).await;
608
609                dbtx.insert_entry(&BlockCountVoteKey(peer), &block_count_vote)
610                    .await;
611
612                let new_consensus_block_count = self.consensus_block_count(dbtx).await;
613
614                debug!(
615                    target: LOG_MODULE_WALLET,
616                    ?peer,
617                    ?current_vote,
618                    ?block_count_vote,
619                    ?old_consensus_block_count,
620                    ?new_consensus_block_count,
621                    "Received block count vote"
622                );
623
624                assert!(old_consensus_block_count <= new_consensus_block_count);
625
626                if new_consensus_block_count != old_consensus_block_count {
627                    // We do not sync blocks that predate the federation itself
628                    if old_consensus_block_count != 0 {
629                        self.sync_up_to_consensus_count(
630                            dbtx,
631                            old_consensus_block_count,
632                            new_consensus_block_count,
633                        )
634                        .await;
635                    } else {
636                        info!(
637                            target: LOG_MODULE_WALLET,
638                            ?old_consensus_block_count,
639                            ?new_consensus_block_count,
640                            "Not syncing up to consensus block count because we are at block 0"
641                        );
642                    }
643                }
644            }
645            WalletConsensusItem::Feerate(feerate) => {
646                if Some(feerate) == dbtx.insert_entry(&FeeRateVoteKey(peer), &feerate).await {
647                    bail!("Fee rate vote is redundant");
648                }
649            }
650            WalletConsensusItem::PegOutSignature(peg_out_signature) => {
651                let txid = peg_out_signature.txid;
652
653                if dbtx.get_value(&PendingTransactionKey(txid)).await.is_some() {
654                    bail!("Already received a threshold of valid signatures");
655                }
656
657                let mut unsigned = dbtx
658                    .get_value(&UnsignedTransactionKey(txid))
659                    .await
660                    .context("Unsigned transaction does not exist")?;
661
662                self.sign_peg_out_psbt(&mut unsigned.psbt, peer, &peg_out_signature)
663                    .context("Peg out signature is invalid")?;
664
665                dbtx.insert_entry(&UnsignedTransactionKey(txid), &unsigned)
666                    .await;
667
668                if let Ok(pending_tx) = self.finalize_peg_out_psbt(unsigned) {
669                    // We were able to finalize the transaction, so we will delete the
670                    // PSBT and instead keep the extracted tx for periodic transmission
671                    // as well as to accept the change into our wallet eventually once
672                    // it confirms.
673                    dbtx.insert_new_entry(&PendingTransactionKey(txid), &pending_tx)
674                        .await;
675
676                    dbtx.remove_entry(&PegOutTxSignatureCI(txid)).await;
677                    dbtx.remove_entry(&UnsignedTransactionKey(txid)).await;
678                    let broadcast_pending = self.broadcast_pending.clone();
679                    dbtx.on_commit(move || {
680                        broadcast_pending.notify_one();
681                    });
682                }
683            }
684            WalletConsensusItem::ModuleConsensusVersion(module_consensus_version) => {
685                let current_vote = dbtx
686                    .get_value(&ConsensusVersionVoteKey(peer))
687                    .await
688                    .unwrap_or(ModuleConsensusVersion::new(2, 0));
689
690                ensure!(
691                    module_consensus_version > current_vote,
692                    "Module consensus version vote is redundant"
693                );
694
695                dbtx.insert_entry(&ConsensusVersionVoteKey(peer), &module_consensus_version)
696                    .await;
697
698                assert!(
699                    self.consensus_module_consensus_version(dbtx).await <= MODULE_CONSENSUS_VERSION,
700                    "Wallet module does not support new consensus version, please upgrade the module"
701                );
702            }
703            WalletConsensusItem::Default { variant, .. } => {
704                panic!("Received wallet consensus item with unknown variant {variant}");
705            }
706        }
707
708        Ok(())
709    }
710
711    async fn process_input<'a, 'b, 'c>(
712        &'a self,
713        dbtx: &mut DatabaseTransaction<'c>,
714        input: &'b WalletInput,
715        _in_point: InPoint,
716    ) -> Result<InputMeta, WalletInputError> {
717        let (outpoint, tx_out, pub_key) = match input {
718            WalletInput::V0(input) => {
719                if !self.block_is_known(dbtx, input.proof_block()).await {
720                    return Err(WalletInputError::UnknownPegInProofBlock(
721                        input.proof_block(),
722                    ));
723                }
724
725                input.verify(&self.secp, &self.cfg.consensus.peg_in_descriptor)?;
726
727                debug!(target: LOG_MODULE_WALLET, outpoint = %input.outpoint(), "Claiming peg-in");
728
729                (input.0.outpoint(), input.tx_output(), input.tweak_key())
730            }
731            WalletInput::V1(input) => {
732                let input_tx_out = dbtx
733                    .get_value(&UnspentTxOutKey(input.outpoint))
734                    .await
735                    .ok_or(WalletInputError::UnknownUTXO)?;
736
737                if input_tx_out.script_pubkey
738                    != self
739                        .cfg
740                        .consensus
741                        .peg_in_descriptor
742                        .tweak(&input.tweak_key, secp256k1::SECP256K1)
743                        .script_pubkey()
744                {
745                    return Err(WalletInputError::WrongOutputScript);
746                }
747
748                // Verifying this is not strictly necessary for the server as the tx_out is only
749                // used in backup and recovery.
750                if input.tx_out != input_tx_out {
751                    return Err(WalletInputError::WrongTxOut);
752                }
753
754                (input.outpoint, input_tx_out, input.tweak_key)
755            }
756            WalletInput::Default { variant, .. } => {
757                return Err(WalletInputError::UnknownInputVariant(
758                    UnknownWalletInputVariantError { variant: *variant },
759                ));
760            }
761        };
762
763        if dbtx
764            .insert_entry(&ClaimedPegInOutpointKey(outpoint), &())
765            .await
766            .is_some()
767        {
768            return Err(WalletInputError::PegInAlreadyClaimed);
769        }
770
771        dbtx.insert_new_entry(
772            &UTXOKey(outpoint),
773            &SpendableUTXO {
774                tweak: pub_key.serialize(),
775                amount: tx_out.value,
776            },
777        )
778        .await;
779
780        let next_index = get_recovery_count(dbtx).await;
781        dbtx.insert_new_entry(
782            &RecoveryItemKey(next_index),
783            &RecoveryItem::Input {
784                outpoint,
785                script: tx_out.script_pubkey,
786            },
787        )
788        .await;
789
790        let amount = tx_out.value.into();
791
792        let fee = self.cfg.consensus.fee_consensus.peg_in_abs;
793
794        calculate_pegin_metrics(dbtx, amount, fee);
795
796        Ok(InputMeta {
797            amount: TransactionItemAmounts {
798                amounts: Amounts::new_bitcoin(amount),
799                fees: Amounts::new_bitcoin(fee),
800            },
801            pub_key,
802        })
803    }
804
805    async fn process_output<'a, 'b>(
806        &'a self,
807        dbtx: &mut DatabaseTransaction<'b>,
808        output: &'a WalletOutput,
809        out_point: OutPoint,
810    ) -> Result<TransactionItemAmounts, WalletOutputError> {
811        let output = output.ensure_v0_ref()?;
812
813        // In 0.4.0 we began preventing RBF withdrawals. Once we reach EoL support
814        // for 0.4.0, we can safely remove RBF withdrawal logic.
815        // see: https://github.com/fedimint/fedimint/issues/5453
816        if let WalletOutputV0::Rbf(_) = output {
817            // This exists as an escape hatch for any federations that successfully
818            // processed an RBF withdrawal due to having a single UTXO owned by the
819            // federation. If a peer needs to resync the federation's history, they can
820            // enable this variable until they've successfully synced, then restart with
821            // this disabled.
822            if is_rbf_withdrawal_enabled() {
823                warn!(target: LOG_MODULE_WALLET, "processing rbf withdrawal");
824            } else {
825                return Err(DEPRECATED_RBF_ERROR);
826            }
827        }
828
829        let change_tweak = self.consensus_nonce(dbtx).await;
830
831        let mut tx = self.create_peg_out_tx(dbtx, output, &change_tweak).await?;
832
833        let fee_rate = self.consensus_fee_rate(dbtx).await;
834
835        StatelessWallet::validate_tx(&tx, output, fee_rate, self.cfg.consensus.network.0)?;
836
837        self.offline_wallet().sign_psbt(&mut tx.psbt);
838
839        let txid = tx.psbt.unsigned_tx.compute_txid();
840
841        info!(
842            target: LOG_MODULE_WALLET,
843            %txid,
844            "Signing peg out",
845        );
846
847        let sigs = tx
848            .psbt
849            .inputs
850            .iter_mut()
851            .map(|input| {
852                assert_eq!(
853                    input.partial_sigs.len(),
854                    1,
855                    "There was already more than one (our) or no signatures in input"
856                );
857
858                // TODO: don't put sig into PSBT in the first place
859                // We actually take out our own signature so everyone finalizes the tx in the
860                // same epoch.
861                let sig = std::mem::take(&mut input.partial_sigs)
862                    .into_values()
863                    .next()
864                    .expect("asserted previously");
865
866                // We drop SIGHASH_ALL, because we always use that and it is only present in the
867                // PSBT for compatibility with other tools.
868                secp256k1::ecdsa::Signature::from_der(&sig.to_vec()[..sig.to_vec().len() - 1])
869                    .expect("we serialized it ourselves that way")
870            })
871            .collect::<Vec<_>>();
872
873        // Delete used UTXOs
874        for input in &tx.psbt.unsigned_tx.input {
875            dbtx.remove_entry(&UTXOKey(input.previous_output)).await;
876        }
877
878        dbtx.insert_new_entry(&UnsignedTransactionKey(txid), &tx)
879            .await;
880
881        dbtx.insert_new_entry(&PegOutTxSignatureCI(txid), &sigs)
882            .await;
883
884        dbtx.insert_new_entry(
885            &PegOutBitcoinTransaction(out_point),
886            &WalletOutputOutcome::new_v0(txid),
887        )
888        .await;
889        let amount: fedimint_core::Amount = output.amount().into();
890        let fee = self.cfg.consensus.fee_consensus.peg_out_abs;
891        calculate_pegout_metrics(dbtx, amount, fee);
892        Ok(TransactionItemAmounts {
893            amounts: Amounts::new_bitcoin(amount),
894            fees: Amounts::new_bitcoin(fee),
895        })
896    }
897
898    async fn output_status(
899        &self,
900        dbtx: &mut DatabaseTransaction<'_>,
901        out_point: OutPoint,
902    ) -> Option<WalletOutputOutcome> {
903        dbtx.get_value(&PegOutBitcoinTransaction(out_point)).await
904    }
905
906    async fn audit(
907        &self,
908        dbtx: &mut DatabaseTransaction<'_>,
909        audit: &mut Audit,
910        module_instance_id: ModuleInstanceId,
911    ) {
912        audit
913            .add_items(dbtx, module_instance_id, &UTXOPrefixKey, |_, v| {
914                v.amount.to_sat() as i64 * 1000
915            })
916            .await;
917        audit
918            .add_items(
919                dbtx,
920                module_instance_id,
921                &UnsignedTransactionPrefixKey,
922                |_, v| match v.rbf {
923                    None => v.change.to_sat() as i64 * 1000,
924                    Some(rbf) => rbf.fees.amount().to_sat() as i64 * -1000,
925                },
926            )
927            .await;
928        audit
929            .add_items(
930                dbtx,
931                module_instance_id,
932                &PendingTransactionPrefixKey,
933                |_, v| match v.rbf {
934                    None => v.change.to_sat() as i64 * 1000,
935                    Some(rbf) => rbf.fees.amount().to_sat() as i64 * -1000,
936                },
937            )
938            .await;
939    }
940
941    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
942        vec![
943            api_endpoint! {
944                BLOCK_COUNT_ENDPOINT,
945                ApiVersion::new(0, 0),
946                async |module: &Wallet, context, _params: ()| -> u32 {
947                    let db = context.db();
948                    let mut dbtx = db.begin_transaction_nc().await;
949                    Ok(module.consensus_block_count(&mut dbtx).await)
950                }
951            },
952            api_endpoint! {
953                BLOCK_COUNT_LOCAL_ENDPOINT,
954                ApiVersion::new(0, 0),
955                async |module: &Wallet, _context, _params: ()| -> Option<u32> {
956                    Ok(module.get_block_count().ok())
957                }
958            },
959            api_endpoint! {
960                PEG_OUT_FEES_ENDPOINT,
961                ApiVersion::new(0, 0),
962                async |module: &Wallet, context, params: (Address<NetworkUnchecked>, u64)| -> Option<PegOutFees> {
963                    let (address, sats) = params;
964                    let db = context.db();
965                    let mut dbtx = db.begin_transaction_nc().await;
966                    let feerate = module.consensus_fee_rate(&mut dbtx).await;
967
968                    // Since we are only calculating the tx size we can use an arbitrary dummy nonce.
969                    let dummy_tweak = [0; 33];
970
971                    let tx = module.offline_wallet().create_tx(
972                        bitcoin::Amount::from_sat(sats),
973                        // Note: While calling `assume_checked()` is generally unwise, it's fine
974                        // here since we're only returning a fee estimate, and we would still
975                        // reject a transaction with the wrong network upon attempted peg-out.
976                        address.assume_checked().script_pubkey(),
977                        vec![],
978                        module.available_utxos(&mut dbtx).await,
979                        feerate,
980                        &dummy_tweak,
981                        None
982                    );
983
984                    match tx {
985                        Err(error) => {
986                            // Usually from not enough spendable UTXOs
987                            warn!(target: LOG_MODULE_WALLET, "Error returning peg-out fees {error}");
988                            Ok(None)
989                        }
990                        Ok(tx) => Ok(Some(tx.fees))
991                    }
992                }
993            },
994            api_endpoint! {
995                BITCOIN_KIND_ENDPOINT,
996                ApiVersion::new(0, 1),
997                async |module: &Wallet, _context, _params: ()| -> String {
998                    Ok(module.btc_rpc.get_bitcoin_rpc_config().kind)
999                }
1000            },
1001            api_endpoint! {
1002                BITCOIN_RPC_CONFIG_ENDPOINT,
1003                ApiVersion::new(0, 1),
1004                async |module: &Wallet, context, _params: ()| -> BitcoinRpcConfig {
1005                    check_auth(context)?;
1006                    let config = module.btc_rpc.get_bitcoin_rpc_config();
1007
1008                    // we need to remove auth, otherwise we'll send over the wire
1009                    let without_auth = config.url.clone().without_auth().map_err(|()| {
1010                        ApiError::server_error("Unable to remove auth from bitcoin config URL".to_string())
1011                    })?;
1012
1013                    Ok(BitcoinRpcConfig {
1014                        url: without_auth,
1015                        ..config
1016                    })
1017                }
1018            },
1019            api_endpoint! {
1020                WALLET_SUMMARY_ENDPOINT,
1021                ApiVersion::new(0, 1),
1022                async |module: &Wallet, context, _params: ()| -> WalletSummary {
1023                    let db = context.db();
1024                    let mut dbtx = db.begin_transaction_nc().await;
1025                    Ok(module.get_wallet_summary(&mut dbtx).await)
1026                }
1027            },
1028            api_endpoint! {
1029                MODULE_CONSENSUS_VERSION_ENDPOINT,
1030                ApiVersion::new(0, 2),
1031                async |module: &Wallet, context, _params: ()| -> ModuleConsensusVersion {
1032                    let db = context.db();
1033                    let mut dbtx = db.begin_transaction_nc().await;
1034                    Ok(module.consensus_module_consensus_version(&mut dbtx).await)
1035                }
1036            },
1037            api_endpoint! {
1038                SUPPORTED_MODULE_CONSENSUS_VERSION_ENDPOINT,
1039                ApiVersion::new(0, 2),
1040                async |_module: &Wallet, _context, _params: ()| -> ModuleConsensusVersion {
1041                    Ok(MODULE_CONSENSUS_VERSION)
1042                }
1043            },
1044            api_endpoint! {
1045                ACTIVATE_CONSENSUS_VERSION_VOTING_ENDPOINT,
1046                ApiVersion::new(0, 2),
1047                async |_module: &Wallet, context, _params: ()| -> () {
1048                    check_auth(context)?;
1049
1050                    let db = context.db();
1051                    let mut dbtx = db.begin_transaction().await;
1052                    dbtx.to_ref().insert_entry(&ConsensusVersionVotingActivationKey, &()).await;
1053                    dbtx.commit_tx_result().await?;
1054                    Ok(())
1055                }
1056            },
1057            api_endpoint! {
1058                UTXO_CONFIRMED_ENDPOINT,
1059                ApiVersion::new(0, 2),
1060                async |module: &Wallet, context, outpoint: bitcoin::OutPoint| -> bool {
1061                    let db = context.db();
1062                    let mut dbtx = db.begin_transaction_nc().await;
1063                    Ok(module.is_utxo_confirmed(&mut dbtx, outpoint).await)
1064                }
1065            },
1066            api_endpoint! {
1067                RECOVERY_COUNT_ENDPOINT,
1068                ApiVersion::new(0, 1),
1069                async |_module: &Wallet, context, _params: ()| -> u64 {
1070                    let db = context.db();
1071                    let mut dbtx = db.begin_transaction_nc().await;
1072                    Ok(get_recovery_count(&mut dbtx).await)
1073                }
1074            },
1075            api_endpoint! {
1076                RECOVERY_SLICE_ENDPOINT,
1077                ApiVersion::new(0, 1),
1078                async |_module: &Wallet, context, range: (u64, u64)| -> Vec<RecoveryItem> {
1079                    let db = context.db();
1080                    let mut dbtx = db.begin_transaction_nc().await;
1081                    Ok(get_recovery_slice(&mut dbtx, range).await)
1082                }
1083            },
1084        ]
1085    }
1086}
1087
1088async fn get_recovery_count(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
1089    dbtx.find_by_prefix_sorted_descending(&RecoveryItemKeyPrefix)
1090        .await
1091        .next()
1092        .await
1093        .map_or(0, |entry| entry.0.0 + 1)
1094}
1095
1096async fn get_recovery_slice(
1097    dbtx: &mut DatabaseTransaction<'_>,
1098    range: (u64, u64),
1099) -> Vec<RecoveryItem> {
1100    dbtx.find_by_range(RecoveryItemKey(range.0)..RecoveryItemKey(range.1))
1101        .await
1102        .map(|entry| entry.1)
1103        .collect()
1104        .await
1105}
1106
1107fn calculate_pegin_metrics(
1108    dbtx: &mut DatabaseTransaction<'_>,
1109    amount: fedimint_core::Amount,
1110    fee: fedimint_core::Amount,
1111) {
1112    dbtx.on_commit(move || {
1113        WALLET_INOUT_SATS
1114            .with_label_values(&["incoming"])
1115            .observe(amount.sats_f64());
1116        WALLET_INOUT_FEES_SATS
1117            .with_label_values(&["incoming"])
1118            .observe(fee.sats_f64());
1119        WALLET_PEGIN_SATS.observe(amount.sats_f64());
1120        WALLET_PEGIN_FEES_SATS.observe(fee.sats_f64());
1121    });
1122}
1123
1124fn calculate_pegout_metrics(
1125    dbtx: &mut DatabaseTransaction<'_>,
1126    amount: fedimint_core::Amount,
1127    fee: fedimint_core::Amount,
1128) {
1129    dbtx.on_commit(move || {
1130        WALLET_INOUT_SATS
1131            .with_label_values(&["outgoing"])
1132            .observe(amount.sats_f64());
1133        WALLET_INOUT_FEES_SATS
1134            .with_label_values(&["outgoing"])
1135            .observe(fee.sats_f64());
1136        WALLET_PEGOUT_SATS.observe(amount.sats_f64());
1137        WALLET_PEGOUT_FEES_SATS.observe(fee.sats_f64());
1138    });
1139}
1140
1141#[derive(Debug)]
1142pub struct Wallet {
1143    cfg: WalletConfig,
1144    db: Database,
1145    secp: Secp256k1<All>,
1146    btc_rpc: ServerBitcoinRpcMonitor,
1147    our_peer_id: PeerId,
1148    /// Broadcasting pending txes can be triggered immediately with this
1149    broadcast_pending: Arc<Notify>,
1150    task_group: TaskGroup,
1151    /// Maximum consensus version supported by *all* our peers. Used to
1152    /// automatically activate new consensus versions as soon as everyone
1153    /// upgrades.
1154    peer_supported_consensus_version: watch::Receiver<Option<ModuleConsensusVersion>>,
1155}
1156
1157impl Wallet {
1158    pub async fn new(
1159        cfg: WalletConfig,
1160        db: &Database,
1161        task_group: &TaskGroup,
1162        our_peer_id: PeerId,
1163        module_api: DynModuleApi,
1164        server_bitcoin_rpc_monitor: ServerBitcoinRpcMonitor,
1165    ) -> anyhow::Result<Wallet> {
1166        let broadcast_pending = Arc::new(Notify::new());
1167        Self::spawn_broadcast_pending_task(
1168            task_group,
1169            &server_bitcoin_rpc_monitor,
1170            db,
1171            broadcast_pending.clone(),
1172        );
1173
1174        let peer_supported_consensus_version =
1175            Self::spawn_peer_supported_consensus_version_task(module_api, task_group, our_peer_id);
1176
1177        let status = retry("verify network", backoff_util::aggressive_backoff(), || {
1178            std::future::ready(
1179                server_bitcoin_rpc_monitor
1180                    .status()
1181                    .context("No connection to bitcoin rpc"),
1182            )
1183        })
1184        .await?;
1185
1186        ensure!(status.network == cfg.consensus.network.0, "Wrong Network");
1187
1188        let wallet = Wallet {
1189            cfg,
1190            db: db.clone(),
1191            secp: Default::default(),
1192            btc_rpc: server_bitcoin_rpc_monitor,
1193            our_peer_id,
1194            task_group: task_group.clone(),
1195            peer_supported_consensus_version,
1196            broadcast_pending,
1197        };
1198
1199        Ok(wallet)
1200    }
1201
1202    /// Try to attach signatures to a pending peg-out tx.
1203    fn sign_peg_out_psbt(
1204        &self,
1205        psbt: &mut Psbt,
1206        peer: PeerId,
1207        signature: &PegOutSignatureItem,
1208    ) -> Result<(), ProcessPegOutSigError> {
1209        let peer_key = self
1210            .cfg
1211            .consensus
1212            .peer_peg_in_keys
1213            .get(&peer)
1214            .expect("always called with valid peer id");
1215
1216        if psbt.inputs.len() != signature.signature.len() {
1217            return Err(ProcessPegOutSigError::WrongSignatureCount(
1218                psbt.inputs.len(),
1219                signature.signature.len(),
1220            ));
1221        }
1222
1223        let mut tx_hasher = SighashCache::new(&psbt.unsigned_tx);
1224        for (idx, (input, signature)) in psbt
1225            .inputs
1226            .iter_mut()
1227            .zip(signature.signature.iter())
1228            .enumerate()
1229        {
1230            let tx_hash = tx_hasher
1231                .p2wsh_signature_hash(
1232                    idx,
1233                    input
1234                        .witness_script
1235                        .as_ref()
1236                        .expect("Missing witness script"),
1237                    input.witness_utxo.as_ref().expect("Missing UTXO").value,
1238                    EcdsaSighashType::All,
1239                )
1240                .map_err(|_| ProcessPegOutSigError::SighashError)?;
1241
1242            let tweak = input
1243                .proprietary
1244                .get(&proprietary_tweak_key())
1245                .expect("we saved it with a tweak");
1246
1247            let tweaked_peer_key = peer_key.tweak(tweak, &self.secp);
1248            self.secp
1249                .verify_ecdsa(
1250                    &Message::from_digest_slice(&tx_hash[..]).unwrap(),
1251                    signature,
1252                    &tweaked_peer_key.key,
1253                )
1254                .map_err(|_| ProcessPegOutSigError::InvalidSignature)?;
1255
1256            if input
1257                .partial_sigs
1258                .insert(tweaked_peer_key.into(), EcdsaSig::sighash_all(*signature))
1259                .is_some()
1260            {
1261                // Should never happen since peers only sign a PSBT once
1262                return Err(ProcessPegOutSigError::DuplicateSignature);
1263            }
1264        }
1265        Ok(())
1266    }
1267
1268    fn finalize_peg_out_psbt(
1269        &self,
1270        mut unsigned: UnsignedTransaction,
1271    ) -> Result<PendingTransaction, ProcessPegOutSigError> {
1272        // We need to save the change output's tweak key to be able to access the funds
1273        // later on. The tweak is extracted here because the psbt is moved next
1274        // and not available anymore when the tweak is actually needed in the
1275        // end to be put into the batch on success.
1276        let change_tweak: [u8; 33] = unsigned
1277            .psbt
1278            .outputs
1279            .iter()
1280            .find_map(|output| output.proprietary.get(&proprietary_tweak_key()).cloned())
1281            .ok_or(ProcessPegOutSigError::MissingOrMalformedChangeTweak)?
1282            .try_into()
1283            .map_err(|_| ProcessPegOutSigError::MissingOrMalformedChangeTweak)?;
1284
1285        if let Err(error) = unsigned.psbt.finalize_mut(&self.secp) {
1286            return Err(ProcessPegOutSigError::ErrorFinalizingPsbt(error));
1287        }
1288
1289        let tx = unsigned.psbt.clone().extract_tx_unchecked_fee_rate();
1290
1291        Ok(PendingTransaction {
1292            tx,
1293            tweak: change_tweak,
1294            change: unsigned.change,
1295            destination: unsigned.destination,
1296            fees: unsigned.fees,
1297            selected_utxos: unsigned.selected_utxos,
1298            peg_out_amount: unsigned.peg_out_amount,
1299            rbf: unsigned.rbf,
1300        })
1301    }
1302
1303    fn get_block_count(&self) -> anyhow::Result<u32> {
1304        self.btc_rpc
1305            .status()
1306            .context("No bitcoin rpc connection")
1307            .and_then(|status| {
1308                status
1309                    .block_count
1310                    .try_into()
1311                    .map_err(|_| format_err!("Block count exceeds u32 limits"))
1312            })
1313    }
1314
1315    pub fn get_fee_rate_opt(&self) -> Feerate {
1316        // `get_feerate_multiplier` is clamped and can't be negative
1317        // feerate sources as clamped and can't be negative or too large
1318        #[allow(clippy::cast_precision_loss)]
1319        #[allow(clippy::cast_sign_loss)]
1320        Feerate {
1321            sats_per_kvb: ((self
1322                .btc_rpc
1323                .status()
1324                .map_or(self.cfg.consensus.default_fee, |status| status.fee_rate)
1325                .sats_per_kvb as f64
1326                * get_feerate_multiplier())
1327            .round()) as u64,
1328        }
1329    }
1330
1331    pub async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u32 {
1332        let peer_count = self.cfg.consensus.peer_peg_in_keys.to_num_peers().total();
1333
1334        let mut counts = dbtx
1335            .find_by_prefix(&BlockCountVotePrefix)
1336            .await
1337            .map(|entry| entry.1)
1338            .collect::<Vec<u32>>()
1339            .await;
1340
1341        assert!(counts.len() <= peer_count);
1342
1343        while counts.len() < peer_count {
1344            counts.push(0);
1345        }
1346
1347        counts.sort_unstable();
1348
1349        counts[peer_count / 2]
1350    }
1351
1352    pub async fn consensus_fee_rate(&self, dbtx: &mut DatabaseTransaction<'_>) -> Feerate {
1353        let peer_count = self.cfg.consensus.peer_peg_in_keys.to_num_peers().total();
1354
1355        let mut rates = dbtx
1356            .find_by_prefix(&FeeRateVotePrefix)
1357            .await
1358            .map(|(.., rate)| rate)
1359            .collect::<Vec<_>>()
1360            .await;
1361
1362        assert!(rates.len() <= peer_count);
1363
1364        while rates.len() < peer_count {
1365            rates.push(self.cfg.consensus.default_fee);
1366        }
1367
1368        rates.sort_unstable();
1369
1370        rates[peer_count / 2]
1371    }
1372
1373    async fn consensus_module_consensus_version(
1374        &self,
1375        dbtx: &mut DatabaseTransaction<'_>,
1376    ) -> ModuleConsensusVersion {
1377        let num_peers = self.cfg.consensus.peer_peg_in_keys.to_num_peers();
1378
1379        let mut versions = dbtx
1380            .find_by_prefix(&ConsensusVersionVotePrefix)
1381            .await
1382            .map(|entry| entry.1)
1383            .collect::<Vec<ModuleConsensusVersion>>()
1384            .await;
1385
1386        while versions.len() < num_peers.total() {
1387            versions.push(ModuleConsensusVersion::new(2, 0));
1388        }
1389
1390        assert_eq!(versions.len(), num_peers.total());
1391
1392        versions.sort_unstable();
1393
1394        assert!(versions.first() <= versions.last());
1395
1396        versions[num_peers.max_evil()]
1397    }
1398
1399    pub async fn consensus_nonce(&self, dbtx: &mut DatabaseTransaction<'_>) -> [u8; 33] {
1400        let nonce_idx = dbtx.get_value(&PegOutNonceKey).await.unwrap_or(0);
1401        dbtx.insert_entry(&PegOutNonceKey, &(nonce_idx + 1)).await;
1402
1403        nonce_from_idx(nonce_idx)
1404    }
1405
1406    async fn sync_up_to_consensus_count(
1407        &self,
1408        dbtx: &mut DatabaseTransaction<'_>,
1409        old_count: u32,
1410        new_count: u32,
1411    ) {
1412        info!(
1413            target: LOG_MODULE_WALLET,
1414            old_count,
1415            new_count,
1416            blocks_to_go = new_count - old_count,
1417            "New block count consensus, initiating sync",
1418        );
1419
1420        // Before we can safely call our bitcoin backend to process the new consensus
1421        // count, we need to ensure we observed enough confirmations
1422        self.wait_for_finality_confs_or_shutdown(new_count).await;
1423
1424        for height in old_count..new_count {
1425            info!(
1426                target: LOG_MODULE_WALLET,
1427                height,
1428                "Processing block of height {height}",
1429            );
1430
1431            // TODO: use batching for mainnet syncing
1432            trace!(block = height, "Fetching block hash");
1433            let block_hash = retry("get_block_hash", backoff_util::background_backoff(), || {
1434                self.btc_rpc.get_block_hash(u64::from(height)) // TODO: use u64 for height everywhere
1435            })
1436            .await
1437            .expect("bitcoind rpc to get block hash");
1438
1439            let block = retry("get_block", backoff_util::background_backoff(), || {
1440                self.btc_rpc.get_block(&block_hash)
1441            })
1442            .await
1443            .expect("bitcoind rpc to get block");
1444
1445            if let Some(prev_block_height) = height.checked_sub(1) {
1446                if let Some(hash) = dbtx
1447                    .get_value(&BlockHashByHeightKey(prev_block_height))
1448                    .await
1449                {
1450                    assert_eq!(block.header.prev_blockhash, hash.0);
1451                } else {
1452                    warn!(
1453                        target: LOG_MODULE_WALLET,
1454                        %height,
1455                        %block_hash,
1456                        %prev_block_height,
1457                        prev_blockhash = %block.header.prev_blockhash,
1458                        "Missing previous block hash. This should only happen on the first processed block height."
1459                    );
1460                }
1461            }
1462
1463            if self.consensus_module_consensus_version(dbtx).await
1464                >= ModuleConsensusVersion::new(2, 2)
1465            {
1466                for transaction in &block.txdata {
1467                    // We maintain the subset of unspent P2WSH transaction outputs created
1468                    // since the module was running on the new consensus version, which might be
1469                    // the same time as the genesis session.
1470
1471                    for tx_in in &transaction.input {
1472                        dbtx.remove_entry(&UnspentTxOutKey(tx_in.previous_output))
1473                            .await;
1474                    }
1475
1476                    for (vout, tx_out) in transaction.output.iter().enumerate() {
1477                        let should_track_utxo = if self.cfg.consensus.peer_peg_in_keys.len() > 1 {
1478                            tx_out.script_pubkey.is_p2wsh()
1479                        } else {
1480                            tx_out.script_pubkey.is_p2wpkh()
1481                        };
1482
1483                        if should_track_utxo {
1484                            let outpoint = bitcoin::OutPoint {
1485                                txid: transaction.compute_txid(),
1486                                vout: vout as u32,
1487                            };
1488
1489                            dbtx.insert_new_entry(&UnspentTxOutKey(outpoint), tx_out)
1490                                .await;
1491                        }
1492                    }
1493                }
1494            }
1495
1496            let pending_transactions = dbtx
1497                .find_by_prefix(&PendingTransactionPrefixKey)
1498                .await
1499                .map(|(key, transaction)| (key.0, transaction))
1500                .collect::<HashMap<Txid, PendingTransaction>>()
1501                .await;
1502            let pending_transactions_len = pending_transactions.len();
1503
1504            debug!(
1505                target: LOG_MODULE_WALLET,
1506                ?height,
1507                ?pending_transactions_len,
1508                "Recognizing change UTXOs"
1509            );
1510            for (txid, tx) in &pending_transactions {
1511                let is_tx_in_block = block.txdata.iter().any(|tx| tx.compute_txid() == *txid);
1512
1513                if is_tx_in_block {
1514                    debug!(
1515                        target: LOG_MODULE_WALLET,
1516                        ?txid, ?height, ?block_hash, "Recognizing change UTXO"
1517                    );
1518                    self.recognize_change_utxo(dbtx, tx).await;
1519                } else {
1520                    debug!(
1521                        target: LOG_MODULE_WALLET,
1522                        ?txid,
1523                        ?height,
1524                        ?block_hash,
1525                        "Pending transaction not yet confirmed in this block"
1526                    );
1527                }
1528            }
1529
1530            dbtx.insert_new_entry(&BlockHashKey(block_hash), &()).await;
1531            dbtx.insert_new_entry(
1532                &BlockHashByHeightKey(height),
1533                &BlockHashByHeightValue(block_hash),
1534            )
1535            .await;
1536
1537            info!(
1538                target: LOG_MODULE_WALLET,
1539                height,
1540                ?block_hash,
1541                "Successfully processed block of height {height}",
1542            );
1543        }
1544    }
1545
1546    /// Add a change UTXO to our spendable UTXO database after it was included
1547    /// in a block that we got consensus on.
1548    async fn recognize_change_utxo(
1549        &self,
1550        dbtx: &mut DatabaseTransaction<'_>,
1551        pending_tx: &PendingTransaction,
1552    ) {
1553        self.remove_rbf_transactions(dbtx, pending_tx).await;
1554
1555        let script_pk = self
1556            .cfg
1557            .consensus
1558            .peg_in_descriptor
1559            .tweak(&pending_tx.tweak, &self.secp)
1560            .script_pubkey();
1561        for (idx, output) in pending_tx.tx.output.iter().enumerate() {
1562            if output.script_pubkey == script_pk {
1563                dbtx.insert_entry(
1564                    &UTXOKey(bitcoin::OutPoint {
1565                        txid: pending_tx.tx.compute_txid(),
1566                        vout: idx as u32,
1567                    }),
1568                    &SpendableUTXO {
1569                        tweak: pending_tx.tweak,
1570                        amount: output.value,
1571                    },
1572                )
1573                .await;
1574            }
1575        }
1576    }
1577
1578    /// Removes the `PendingTransaction` and any transactions tied to it via RBF
1579    async fn remove_rbf_transactions(
1580        &self,
1581        dbtx: &mut DatabaseTransaction<'_>,
1582        pending_tx: &PendingTransaction,
1583    ) {
1584        let mut all_transactions: BTreeMap<Txid, PendingTransaction> = dbtx
1585            .find_by_prefix(&PendingTransactionPrefixKey)
1586            .await
1587            .map(|(key, val)| (key.0, val))
1588            .collect::<BTreeMap<Txid, PendingTransaction>>()
1589            .await;
1590
1591        // We need to search and remove all `PendingTransactions` invalidated by RBF
1592        let mut pending_to_remove = vec![pending_tx.clone()];
1593        while let Some(removed) = pending_to_remove.pop() {
1594            all_transactions.remove(&removed.tx.compute_txid());
1595            dbtx.remove_entry(&PendingTransactionKey(removed.tx.compute_txid()))
1596                .await;
1597
1598            // Search for tx that this `removed` has as RBF
1599            if let Some(rbf) = &removed.rbf
1600                && let Some(tx) = all_transactions.get(&rbf.txid)
1601            {
1602                pending_to_remove.push(tx.clone());
1603            }
1604
1605            // Search for tx that wanted to RBF the `removed` one
1606            for tx in all_transactions.values() {
1607                if let Some(rbf) = &tx.rbf
1608                    && rbf.txid == removed.tx.compute_txid()
1609                {
1610                    pending_to_remove.push(tx.clone());
1611                }
1612            }
1613        }
1614    }
1615
1616    async fn block_is_known(
1617        &self,
1618        dbtx: &mut DatabaseTransaction<'_>,
1619        block_hash: BlockHash,
1620    ) -> bool {
1621        dbtx.get_value(&BlockHashKey(block_hash)).await.is_some()
1622    }
1623
1624    async fn create_peg_out_tx(
1625        &self,
1626        dbtx: &mut DatabaseTransaction<'_>,
1627        output: &WalletOutputV0,
1628        change_tweak: &[u8; 33],
1629    ) -> Result<UnsignedTransaction, WalletOutputError> {
1630        match output {
1631            WalletOutputV0::PegOut(peg_out) => self.offline_wallet().create_tx(
1632                peg_out.amount,
1633                // Note: While calling `assume_checked()` is generally unwise, checking the
1634                // network here could be a consensus-breaking change. Ignoring the network
1635                // is fine here since we validate it in `process_output()`.
1636                peg_out.recipient.clone().assume_checked().script_pubkey(),
1637                vec![],
1638                self.available_utxos(dbtx).await,
1639                peg_out.fees.fee_rate,
1640                change_tweak,
1641                None,
1642            ),
1643            WalletOutputV0::Rbf(rbf) => {
1644                let tx = dbtx
1645                    .get_value(&PendingTransactionKey(rbf.txid))
1646                    .await
1647                    .ok_or(WalletOutputError::RbfTransactionIdNotFound)?;
1648
1649                self.offline_wallet().create_tx(
1650                    tx.peg_out_amount,
1651                    tx.destination,
1652                    tx.selected_utxos,
1653                    self.available_utxos(dbtx).await,
1654                    tx.fees.fee_rate,
1655                    change_tweak,
1656                    Some(rbf.clone()),
1657                )
1658            }
1659        }
1660    }
1661
1662    async fn available_utxos(
1663        &self,
1664        dbtx: &mut DatabaseTransaction<'_>,
1665    ) -> Vec<(UTXOKey, SpendableUTXO)> {
1666        dbtx.find_by_prefix(&UTXOPrefixKey)
1667            .await
1668            .collect::<Vec<(UTXOKey, SpendableUTXO)>>()
1669            .await
1670    }
1671
1672    pub async fn get_wallet_value(&self, dbtx: &mut DatabaseTransaction<'_>) -> bitcoin::Amount {
1673        let sat_sum = self
1674            .available_utxos(dbtx)
1675            .await
1676            .into_iter()
1677            .map(|(_, utxo)| utxo.amount.to_sat())
1678            .sum();
1679        bitcoin::Amount::from_sat(sat_sum)
1680    }
1681
1682    async fn get_wallet_summary(&self, dbtx: &mut DatabaseTransaction<'_>) -> WalletSummary {
1683        fn partition_peg_out_and_change(
1684            transactions: Vec<Transaction>,
1685        ) -> (Vec<TxOutputSummary>, Vec<TxOutputSummary>) {
1686            let mut peg_out_txos: Vec<TxOutputSummary> = Vec::new();
1687            let mut change_utxos: Vec<TxOutputSummary> = Vec::new();
1688
1689            for tx in transactions {
1690                let txid = tx.compute_txid();
1691
1692                // to identify outputs for the peg_out (idx = 0) and change (idx = 1), we lean
1693                // on how the wallet constructs the transaction
1694                let peg_out_output = tx
1695                    .output
1696                    .first()
1697                    .expect("tx must contain withdrawal output");
1698
1699                let change_output = tx.output.last().expect("tx must contain change output");
1700
1701                peg_out_txos.push(TxOutputSummary {
1702                    outpoint: bitcoin::OutPoint { txid, vout: 0 },
1703                    amount: peg_out_output.value,
1704                });
1705
1706                change_utxos.push(TxOutputSummary {
1707                    outpoint: bitcoin::OutPoint { txid, vout: 1 },
1708                    amount: change_output.value,
1709                });
1710            }
1711
1712            (peg_out_txos, change_utxos)
1713        }
1714
1715        let spendable_utxos = self
1716            .available_utxos(dbtx)
1717            .await
1718            .iter()
1719            .map(|(utxo_key, spendable_utxo)| TxOutputSummary {
1720                outpoint: utxo_key.0,
1721                amount: spendable_utxo.amount,
1722            })
1723            .collect::<Vec<_>>();
1724
1725        // constructed peg-outs without threshold signatures
1726        let unsigned_transactions = dbtx
1727            .find_by_prefix(&UnsignedTransactionPrefixKey)
1728            .await
1729            .map(|(_tx_key, tx)| tx.psbt.unsigned_tx)
1730            .collect::<Vec<_>>()
1731            .await;
1732
1733        // peg-outs with threshold signatures, awaiting finality delay confirmations
1734        let unconfirmed_transactions = dbtx
1735            .find_by_prefix(&PendingTransactionPrefixKey)
1736            .await
1737            .map(|(_tx_key, tx)| tx.tx)
1738            .collect::<Vec<_>>()
1739            .await;
1740
1741        let (unsigned_peg_out_txos, unsigned_change_utxos) =
1742            partition_peg_out_and_change(unsigned_transactions);
1743
1744        let (unconfirmed_peg_out_txos, unconfirmed_change_utxos) =
1745            partition_peg_out_and_change(unconfirmed_transactions);
1746
1747        WalletSummary {
1748            spendable_utxos,
1749            unsigned_peg_out_txos,
1750            unsigned_change_utxos,
1751            unconfirmed_peg_out_txos,
1752            unconfirmed_change_utxos,
1753        }
1754    }
1755
1756    async fn is_utxo_confirmed(
1757        &self,
1758        dbtx: &mut DatabaseTransaction<'_>,
1759        outpoint: bitcoin::OutPoint,
1760    ) -> bool {
1761        dbtx.get_value(&UnspentTxOutKey(outpoint)).await.is_some()
1762    }
1763
1764    fn offline_wallet(&'_ self) -> StatelessWallet<'_> {
1765        StatelessWallet {
1766            descriptor: &self.cfg.consensus.peg_in_descriptor,
1767            secret_key: &self.cfg.private.peg_in_key,
1768            secp: &self.secp,
1769        }
1770    }
1771
1772    fn spawn_broadcast_pending_task(
1773        task_group: &TaskGroup,
1774        server_bitcoin_rpc_monitor: &ServerBitcoinRpcMonitor,
1775        db: &Database,
1776        broadcast_pending_notify: Arc<Notify>,
1777    ) {
1778        task_group.spawn_cancellable("broadcast pending", {
1779            let btc_rpc = server_bitcoin_rpc_monitor.clone();
1780            let db = db.clone();
1781            run_broadcast_pending_tx(db, btc_rpc, broadcast_pending_notify)
1782        });
1783    }
1784
1785    /// Get the bitcoin network for UI display
1786    pub fn network_ui(&self) -> Network {
1787        self.cfg.consensus.network.0
1788    }
1789
1790    /// Get the current consensus block count for UI display
1791    pub async fn consensus_block_count_ui(&self) -> u32 {
1792        self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
1793            .await
1794    }
1795
1796    /// Get the current consensus fee rate for UI display
1797    pub async fn consensus_feerate_ui(&self) -> Feerate {
1798        self.consensus_fee_rate(&mut self.db.begin_transaction_nc().await)
1799            .await
1800    }
1801
1802    /// Get the current wallet summary for UI display
1803    pub async fn get_wallet_summary_ui(&self) -> WalletSummary {
1804        self.get_wallet_summary(&mut self.db.begin_transaction_nc().await)
1805            .await
1806    }
1807
1808    /// Shutdown the task group shared throughout fedimintd, giving 60 seconds
1809    /// for other services to gracefully shutdown.
1810    async fn graceful_shutdown(&self) {
1811        if let Err(e) = self
1812            .task_group
1813            .clone()
1814            .shutdown_join_all(Some(Duration::from_secs(60)))
1815            .await
1816        {
1817            panic!("Error while shutting down fedimintd task group: {e}");
1818        }
1819    }
1820
1821    /// Returns once our bitcoin backend observes finality delay confirmations
1822    /// of the consensus block count. If we don't observe enough confirmations
1823    /// after one hour, we gracefully shutdown fedimintd. This is necessary
1824    /// since we can no longer participate in consensus if our bitcoin backend
1825    /// is unable to observe the same chain tip as our peers.
1826    async fn wait_for_finality_confs_or_shutdown(&self, consensus_block_count: u32) {
1827        let backoff = if is_running_in_test_env() {
1828            // every 100ms for 60s
1829            backoff_util::custom_backoff(
1830                Duration::from_millis(100),
1831                Duration::from_millis(100),
1832                Some(10 * 60),
1833            )
1834        } else {
1835            // every max 10s for 1 hour
1836            backoff_util::fibonacci_max_one_hour()
1837        };
1838
1839        let wait_for_finality_confs = || async {
1840            let our_chain_tip_block_count = self.get_block_count()?;
1841            let consensus_chain_tip_block_count =
1842                consensus_block_count + self.cfg.consensus.finality_delay;
1843
1844            if consensus_chain_tip_block_count <= our_chain_tip_block_count {
1845                Ok(())
1846            } else {
1847                Err(anyhow::anyhow!("not enough confirmations"))
1848            }
1849        };
1850
1851        if retry("wait_for_finality_confs", backoff, wait_for_finality_confs)
1852            .await
1853            .is_err()
1854        {
1855            self.graceful_shutdown().await;
1856        }
1857    }
1858
1859    fn spawn_peer_supported_consensus_version_task(
1860        api_client: DynModuleApi,
1861        task_group: &TaskGroup,
1862        our_peer_id: PeerId,
1863    ) -> watch::Receiver<Option<ModuleConsensusVersion>> {
1864        let (sender, receiver) = watch::channel(None);
1865        task_group.spawn_cancellable("fetch-peer-consensus-versions", async move {
1866            loop {
1867                let request_futures = api_client.all_peers().iter().filter_map(|&peer| {
1868                    if peer == our_peer_id {
1869                        return None;
1870                    }
1871
1872                    let api_client_inner = api_client.clone();
1873                    Some(async move {
1874                        api_client_inner
1875                            .request_single_peer::<ModuleConsensusVersion>(
1876                                SUPPORTED_MODULE_CONSENSUS_VERSION_ENDPOINT.to_owned(),
1877                                ApiRequestErased::default(),
1878                                peer,
1879                            )
1880                            .await
1881                            .inspect(|res| debug!(
1882                                target: LOG_MODULE_WALLET,
1883                                %peer,
1884                                %our_peer_id,
1885                                ?res,
1886                                "Fetched supported module consensus version from peer"
1887                            ))
1888                            .inspect_err(|err| warn!(
1889                                target: LOG_MODULE_WALLET,
1890                                 %peer,
1891                                 err=%err.fmt_compact(),
1892                                "Failed to fetch consensus version from peer"
1893                            ))
1894                            .ok()
1895                    })
1896                });
1897
1898                let peer_consensus_versions = join_all(request_futures)
1899                    .await
1900                    .into_iter()
1901                    .flatten()
1902                    .collect::<Vec<_>>();
1903
1904                let sorted_consensus_versions = peer_consensus_versions
1905                    .into_iter()
1906                    .chain(std::iter::once(MODULE_CONSENSUS_VERSION))
1907                    .sorted()
1908                    .collect::<Vec<_>>();
1909                let all_peers_supported_version =
1910                    if sorted_consensus_versions.len() == api_client.all_peers().len() {
1911                        let min_supported_version = *sorted_consensus_versions
1912                            .first()
1913                            .expect("at least one element");
1914
1915                        debug!(
1916                            target: LOG_MODULE_WALLET,
1917                            ?sorted_consensus_versions,
1918                            "Fetched supported consensus versions from peers"
1919                        );
1920
1921                        Some(min_supported_version)
1922                    } else {
1923                        assert!(
1924                            sorted_consensus_versions.len() <= api_client.all_peers().len(),
1925                            "Too many peer responses",
1926                        );
1927                        trace!(
1928                            target: LOG_MODULE_WALLET,
1929                            ?sorted_consensus_versions,
1930                            "Not all peers have reported their consensus version yet"
1931                        );
1932                        None
1933                    };
1934
1935                #[allow(clippy::disallowed_methods)]
1936                if sender.send(all_peers_supported_version).is_err() {
1937                    warn!(target: LOG_MODULE_WALLET, "Failed to send consensus version to watch channel, stopping task");
1938                    break;
1939                }
1940
1941                if is_running_in_test_env() {
1942                    // Even in tests we don't want to spam the federation with requests about it
1943                    sleep(Duration::from_secs(5)).await;
1944                } else {
1945                    sleep(Duration::from_secs(600)).await;
1946                }
1947            }
1948        });
1949        receiver
1950    }
1951}
1952
1953#[instrument(target = LOG_MODULE_WALLET, level = "debug", skip_all)]
1954pub async fn run_broadcast_pending_tx(
1955    db: Database,
1956    rpc: ServerBitcoinRpcMonitor,
1957    broadcast: Arc<Notify>,
1958) {
1959    loop {
1960        // Unless something new happened, we broadcast once a minute
1961        let _ = tokio::time::timeout(Duration::from_secs(60), broadcast.notified()).await;
1962        broadcast_pending_tx(db.begin_transaction_nc().await, &rpc).await;
1963    }
1964}
1965
1966pub async fn broadcast_pending_tx(
1967    mut dbtx: DatabaseTransaction<'_>,
1968    rpc: &ServerBitcoinRpcMonitor,
1969) {
1970    let pending_tx: Vec<PendingTransaction> = dbtx
1971        .find_by_prefix(&PendingTransactionPrefixKey)
1972        .await
1973        .map(|(_, val)| val)
1974        .collect::<Vec<_>>()
1975        .await;
1976    let rbf_txids: BTreeSet<Txid> = pending_tx
1977        .iter()
1978        .filter_map(|tx| tx.rbf.clone().map(|rbf| rbf.txid))
1979        .collect();
1980    if !pending_tx.is_empty() {
1981        debug!(
1982            target: LOG_MODULE_WALLET,
1983            "Broadcasting pending transactions (total={}, rbf={})",
1984            pending_tx.len(),
1985            rbf_txids.len()
1986        );
1987    }
1988
1989    for PendingTransaction { tx, .. } in pending_tx {
1990        if !rbf_txids.contains(&tx.compute_txid()) {
1991            debug!(
1992                target: LOG_MODULE_WALLET,
1993                tx = %tx.compute_txid(),
1994                weight = tx.weight().to_wu(),
1995                output = ?tx.output,
1996                "Broadcasting peg-out",
1997            );
1998            trace!(transaction = ?tx);
1999            rpc.submit_transaction(tx).await;
2000        }
2001    }
2002}
2003
2004struct StatelessWallet<'a> {
2005    descriptor: &'a Descriptor<CompressedPublicKey>,
2006    secret_key: &'a secp256k1::SecretKey,
2007    secp: &'a secp256k1::Secp256k1<secp256k1::All>,
2008}
2009
2010impl StatelessWallet<'_> {
2011    /// Given a tx created from an `WalletOutput`, validate there will be no
2012    /// issues submitting the transaction to the Bitcoin network
2013    fn validate_tx(
2014        tx: &UnsignedTransaction,
2015        output: &WalletOutputV0,
2016        consensus_fee_rate: Feerate,
2017        network: Network,
2018    ) -> Result<(), WalletOutputError> {
2019        if let WalletOutputV0::PegOut(peg_out) = output
2020            && !peg_out.recipient.is_valid_for_network(network)
2021        {
2022            return Err(WalletOutputError::WrongNetwork(
2023                NetworkLegacyEncodingWrapper(network),
2024                NetworkLegacyEncodingWrapper(get_network_for_address(&peg_out.recipient)),
2025            ));
2026        }
2027
2028        // Validate the tx amount is over the dust limit
2029        if tx.peg_out_amount < tx.destination.minimal_non_dust() {
2030            return Err(WalletOutputError::PegOutUnderDustLimit);
2031        }
2032
2033        // Validate tx fee rate is above the consensus fee rate
2034        if tx.fees.fee_rate < consensus_fee_rate {
2035            return Err(WalletOutputError::PegOutFeeBelowConsensus(
2036                tx.fees.fee_rate,
2037                consensus_fee_rate,
2038            ));
2039        }
2040
2041        // Validate added fees are above the min relay tx fee
2042        // BIP-0125 requires 1 sat/vb for RBF by default (same as normal txs)
2043        let fees = match output {
2044            WalletOutputV0::PegOut(pegout) => pegout.fees,
2045            WalletOutputV0::Rbf(rbf) => rbf.fees,
2046        };
2047        if fees.fee_rate.sats_per_kvb < u64::from(DEFAULT_MIN_RELAY_TX_FEE) {
2048            return Err(WalletOutputError::BelowMinRelayFee);
2049        }
2050
2051        // Validate fees weight matches the actual weight
2052        if fees.total_weight != tx.fees.total_weight {
2053            return Err(WalletOutputError::TxWeightIncorrect(
2054                fees.total_weight,
2055                tx.fees.total_weight,
2056            ));
2057        }
2058
2059        Ok(())
2060    }
2061
2062    /// Attempts to create a tx ready to be signed from available UTXOs.
2063    //
2064    // * `peg_out_amount`: How much the peg-out should be
2065    // * `destination`: The address the user is pegging-out to
2066    // * `included_utxos`: UXTOs that must be included (for RBF)
2067    // * `remaining_utxos`: All other spendable UXTOs
2068    // * `fee_rate`: How much needs to be spent on fees
2069    // * `change_tweak`: How the federation can recognize it's change UTXO
2070    // * `rbf`: If this is an RBF transaction
2071    #[allow(clippy::too_many_arguments)]
2072    fn create_tx(
2073        &self,
2074        peg_out_amount: bitcoin::Amount,
2075        destination: ScriptBuf,
2076        mut included_utxos: Vec<(UTXOKey, SpendableUTXO)>,
2077        mut remaining_utxos: Vec<(UTXOKey, SpendableUTXO)>,
2078        mut fee_rate: Feerate,
2079        change_tweak: &[u8; 33],
2080        rbf: Option<Rbf>,
2081    ) -> Result<UnsignedTransaction, WalletOutputError> {
2082        // Add the rbf fees to the existing tx fees
2083        if let Some(rbf) = &rbf {
2084            fee_rate.sats_per_kvb += rbf.fees.fee_rate.sats_per_kvb;
2085        }
2086
2087        // When building a transaction we need to take care of two things:
2088        //  * We need enough input amount to fund all outputs
2089        //  * We need to keep an eye on the tx weight so we can factor the fees into out
2090        //    calculation
2091        // We then go on to calculate the base size of the transaction `total_weight`
2092        // and the maximum weight per added input which we will add every time
2093        // we select an input.
2094        let change_script = self.derive_script(change_tweak);
2095        let out_weight = (destination.len() * 4 + 1 + 32
2096            // Add change script weight, it's very likely to be needed if not we just overpay in fees
2097            + 1 // script len varint, 1 byte for all addresses we accept
2098            + change_script.len() * 4 // script len
2099            + 32) as u64; // value
2100        let mut total_weight = 16 + // version
2101            12 + // up to 2**16-1 inputs
2102            12 + // up to 2**16-1 outputs
2103            out_weight + // weight of all outputs
2104            16; // lock time
2105        // https://github.com/fedimint/fedimint/issues/4590
2106        #[allow(deprecated)]
2107        let max_input_weight = (self
2108            .descriptor
2109            .max_satisfaction_weight()
2110            .expect("is satisfyable") +
2111            128 + // TxOutHash
2112            16 + // TxOutIndex
2113            16) as u64; // sequence
2114
2115        // Ensure deterministic ordering of UTXOs for all peers
2116        included_utxos.sort_by_key(|(_, utxo)| utxo.amount);
2117        remaining_utxos.sort_by_key(|(_, utxo)| utxo.amount);
2118        included_utxos.extend(remaining_utxos);
2119
2120        // Finally we initialize our accumulator for selected input amounts
2121        let mut total_selected_value = bitcoin::Amount::from_sat(0);
2122        let mut selected_utxos: Vec<(UTXOKey, SpendableUTXO)> = vec![];
2123        let mut fees = fee_rate.calculate_fee(total_weight);
2124
2125        while total_selected_value < peg_out_amount + change_script.minimal_non_dust() + fees {
2126            match included_utxos.pop() {
2127                Some((utxo_key, utxo)) => {
2128                    total_selected_value += utxo.amount;
2129                    total_weight += max_input_weight;
2130                    fees = fee_rate.calculate_fee(total_weight);
2131                    selected_utxos.push((utxo_key, utxo));
2132                }
2133                _ => return Err(WalletOutputError::NotEnoughSpendableUTXO), // Not enough UTXOs
2134            }
2135        }
2136
2137        // We always pay ourselves change back to ensure that we don't lose anything due
2138        // to dust
2139        let change = total_selected_value - fees - peg_out_amount;
2140        let output: Vec<TxOut> = vec![
2141            TxOut {
2142                value: peg_out_amount,
2143                script_pubkey: destination.clone(),
2144            },
2145            TxOut {
2146                value: change,
2147                script_pubkey: change_script,
2148            },
2149        ];
2150        let mut change_out = bitcoin::psbt::Output::default();
2151        change_out
2152            .proprietary
2153            .insert(proprietary_tweak_key(), change_tweak.to_vec());
2154
2155        info!(
2156            target: LOG_MODULE_WALLET,
2157            inputs = selected_utxos.len(),
2158            input_sats = total_selected_value.to_sat(),
2159            peg_out_sats = peg_out_amount.to_sat(),
2160            ?total_weight,
2161            fees_sats = fees.to_sat(),
2162            fee_rate = fee_rate.sats_per_kvb,
2163            change_sats = change.to_sat(),
2164            "Creating peg-out tx",
2165        );
2166
2167        let transaction = Transaction {
2168            version: bitcoin::transaction::Version(2),
2169            lock_time: LockTime::ZERO,
2170            input: selected_utxos
2171                .iter()
2172                .map(|(utxo_key, _utxo)| TxIn {
2173                    previous_output: utxo_key.0,
2174                    script_sig: Default::default(),
2175                    sequence: Sequence::ENABLE_RBF_NO_LOCKTIME,
2176                    witness: bitcoin::Witness::new(),
2177                })
2178                .collect(),
2179            output,
2180        };
2181        info!(
2182            target: LOG_MODULE_WALLET,
2183            txid = %transaction.compute_txid(), "Creating peg-out tx"
2184        );
2185
2186        // FIXME: use custom data structure that guarantees more invariants and only
2187        // convert to PSBT for finalization
2188        let psbt = Psbt {
2189            unsigned_tx: transaction,
2190            version: 0,
2191            xpub: Default::default(),
2192            proprietary: Default::default(),
2193            unknown: Default::default(),
2194            inputs: selected_utxos
2195                .iter()
2196                .map(|(_utxo_key, utxo)| {
2197                    let script_pubkey = self
2198                        .descriptor
2199                        .tweak(&utxo.tweak, self.secp)
2200                        .script_pubkey();
2201                    Input {
2202                        non_witness_utxo: None,
2203                        witness_utxo: Some(TxOut {
2204                            value: utxo.amount,
2205                            script_pubkey,
2206                        }),
2207                        partial_sigs: Default::default(),
2208                        sighash_type: None,
2209                        redeem_script: None,
2210                        witness_script: Some(
2211                            self.descriptor
2212                                .tweak(&utxo.tweak, self.secp)
2213                                .script_code()
2214                                .expect("Failed to tweak descriptor"),
2215                        ),
2216                        bip32_derivation: Default::default(),
2217                        final_script_sig: None,
2218                        final_script_witness: None,
2219                        ripemd160_preimages: Default::default(),
2220                        sha256_preimages: Default::default(),
2221                        hash160_preimages: Default::default(),
2222                        hash256_preimages: Default::default(),
2223                        proprietary: vec![(proprietary_tweak_key(), utxo.tweak.to_vec())]
2224                            .into_iter()
2225                            .collect(),
2226                        tap_key_sig: Default::default(),
2227                        tap_script_sigs: Default::default(),
2228                        tap_scripts: Default::default(),
2229                        tap_key_origins: Default::default(),
2230                        tap_internal_key: Default::default(),
2231                        tap_merkle_root: Default::default(),
2232                        unknown: Default::default(),
2233                    }
2234                })
2235                .collect(),
2236            outputs: vec![Default::default(), change_out],
2237        };
2238
2239        Ok(UnsignedTransaction {
2240            psbt,
2241            signatures: vec![],
2242            change,
2243            fees: PegOutFees {
2244                fee_rate,
2245                total_weight,
2246            },
2247            destination,
2248            selected_utxos,
2249            peg_out_amount,
2250            rbf,
2251        })
2252    }
2253
2254    fn sign_psbt(&self, psbt: &mut Psbt) {
2255        let mut tx_hasher = SighashCache::new(&psbt.unsigned_tx);
2256
2257        for (idx, (psbt_input, _tx_input)) in psbt
2258            .inputs
2259            .iter_mut()
2260            .zip(psbt.unsigned_tx.input.iter())
2261            .enumerate()
2262        {
2263            let tweaked_secret = {
2264                let tweak = psbt_input
2265                    .proprietary
2266                    .get(&proprietary_tweak_key())
2267                    .expect("Malformed PSBT: expected tweak");
2268
2269                self.secret_key.tweak(tweak, self.secp)
2270            };
2271
2272            let tx_hash = tx_hasher
2273                .p2wsh_signature_hash(
2274                    idx,
2275                    psbt_input
2276                        .witness_script
2277                        .as_ref()
2278                        .expect("Missing witness script"),
2279                    psbt_input
2280                        .witness_utxo
2281                        .as_ref()
2282                        .expect("Missing UTXO")
2283                        .value,
2284                    EcdsaSighashType::All,
2285                )
2286                .expect("Failed to create segwit sighash");
2287
2288            let signature = self.secp.sign_ecdsa(
2289                &Message::from_digest_slice(&tx_hash[..]).unwrap(),
2290                &tweaked_secret,
2291            );
2292
2293            psbt_input.partial_sigs.insert(
2294                bitcoin::PublicKey {
2295                    compressed: true,
2296                    inner: secp256k1::PublicKey::from_secret_key(self.secp, &tweaked_secret),
2297                },
2298                EcdsaSig::sighash_all(signature),
2299            );
2300        }
2301    }
2302
2303    fn derive_script(&self, tweak: &[u8]) -> ScriptBuf {
2304        struct CompressedPublicKeyTranslator<'t, 's, Ctx: Verification> {
2305            tweak: &'t [u8],
2306            secp: &'s Secp256k1<Ctx>,
2307        }
2308
2309        impl<Ctx: Verification>
2310            miniscript::Translator<CompressedPublicKey, CompressedPublicKey, Infallible>
2311            for CompressedPublicKeyTranslator<'_, '_, Ctx>
2312        {
2313            fn pk(&mut self, pk: &CompressedPublicKey) -> Result<CompressedPublicKey, Infallible> {
2314                let hashed_tweak = {
2315                    let mut hasher = HmacEngine::<sha256::Hash>::new(&pk.key.serialize()[..]);
2316                    hasher.input(self.tweak);
2317                    Hmac::from_engine(hasher).to_byte_array()
2318                };
2319
2320                Ok(CompressedPublicKey {
2321                    key: pk
2322                        .key
2323                        .add_exp_tweak(
2324                            self.secp,
2325                            &Scalar::from_be_bytes(hashed_tweak).expect("can't fail"),
2326                        )
2327                        .expect("tweaking failed"),
2328                })
2329            }
2330            translate_hash_fail!(CompressedPublicKey, CompressedPublicKey, Infallible);
2331        }
2332
2333        let descriptor = self
2334            .descriptor
2335            .translate_pk(&mut CompressedPublicKeyTranslator {
2336                tweak,
2337                secp: self.secp,
2338            })
2339            .expect("can't fail");
2340
2341        descriptor.script_pubkey()
2342    }
2343}
2344
2345pub fn nonce_from_idx(nonce_idx: u64) -> [u8; 33] {
2346    let mut nonce: [u8; 33] = [0; 33];
2347    // Make it look like a compressed pubkey, has to be either 0x02 or 0x03
2348    nonce[0] = 0x02;
2349    nonce[1..].copy_from_slice(&nonce_idx.consensus_hash::<bitcoin::hashes::sha256::Hash>()[..]);
2350
2351    nonce
2352}
2353
2354/// A peg-out tx that is ready to be broadcast with a tweak for the change UTXO
2355#[derive(Clone, Debug, Encodable, Decodable)]
2356pub struct PendingTransaction {
2357    pub tx: bitcoin::Transaction,
2358    pub tweak: [u8; 33],
2359    pub change: bitcoin::Amount,
2360    pub destination: ScriptBuf,
2361    pub fees: PegOutFees,
2362    pub selected_utxos: Vec<(UTXOKey, SpendableUTXO)>,
2363    pub peg_out_amount: bitcoin::Amount,
2364    pub rbf: Option<Rbf>,
2365}
2366
2367impl Serialize for PendingTransaction {
2368    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2369    where
2370        S: serde::Serializer,
2371    {
2372        if serializer.is_human_readable() {
2373            serializer.serialize_str(&self.consensus_encode_to_hex())
2374        } else {
2375            serializer.serialize_bytes(&self.consensus_encode_to_vec())
2376        }
2377    }
2378}
2379
2380/// A PSBT that is awaiting enough signatures from the federation to becoming a
2381/// `PendingTransaction`
2382#[derive(Clone, Debug, Eq, PartialEq, Encodable, Decodable)]
2383pub struct UnsignedTransaction {
2384    pub psbt: Psbt,
2385    pub signatures: Vec<(PeerId, PegOutSignatureItem)>,
2386    pub change: bitcoin::Amount,
2387    pub fees: PegOutFees,
2388    pub destination: ScriptBuf,
2389    pub selected_utxos: Vec<(UTXOKey, SpendableUTXO)>,
2390    pub peg_out_amount: bitcoin::Amount,
2391    pub rbf: Option<Rbf>,
2392}
2393
2394impl Serialize for UnsignedTransaction {
2395    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2396    where
2397        S: serde::Serializer,
2398    {
2399        if serializer.is_human_readable() {
2400            serializer.serialize_str(&self.consensus_encode_to_hex())
2401        } else {
2402            serializer.serialize_bytes(&self.consensus_encode_to_vec())
2403        }
2404    }
2405}
2406
2407#[cfg(test)]
2408mod tests {
2409
2410    use std::str::FromStr;
2411
2412    use bitcoin::Network::{Bitcoin, Testnet};
2413    use bitcoin::hashes::Hash;
2414    use bitcoin::{Address, Amount, OutPoint, Txid, secp256k1};
2415    use fedimint_core::Feerate;
2416    use fedimint_core::encoding::btc::NetworkLegacyEncodingWrapper;
2417    use fedimint_wallet_common::{PegOut, PegOutFees, Rbf, WalletOutputV0};
2418    use miniscript::descriptor::Wsh;
2419
2420    use crate::common::PegInDescriptor;
2421    use crate::{
2422        CompressedPublicKey, OsRng, SpendableUTXO, StatelessWallet, UTXOKey, WalletOutputError,
2423    };
2424
2425    #[test]
2426    fn create_tx_should_validate_amounts() {
2427        let secp = secp256k1::Secp256k1::new();
2428
2429        let descriptor = PegInDescriptor::Wsh(
2430            Wsh::new_sortedmulti(
2431                3,
2432                (0..4)
2433                    .map(|_| secp.generate_keypair(&mut OsRng))
2434                    .map(|(_, key)| CompressedPublicKey { key })
2435                    .collect(),
2436            )
2437            .unwrap(),
2438        );
2439
2440        let (secret_key, _) = secp.generate_keypair(&mut OsRng);
2441
2442        let wallet = StatelessWallet {
2443            descriptor: &descriptor,
2444            secret_key: &secret_key,
2445            secp: &secp,
2446        };
2447
2448        let spendable = SpendableUTXO {
2449            tweak: [0; 33],
2450            amount: bitcoin::Amount::from_sat(3000),
2451        };
2452
2453        let recipient = Address::from_str("32iVBEu4dxkUQk9dJbZUiBiQdmypcEyJRf").unwrap();
2454
2455        let fee = Feerate { sats_per_kvb: 1000 };
2456        let weight = 875;
2457
2458        // not enough SpendableUTXO
2459        // tx fee = ceil(875 / 4) * 1 sat/vb = 219
2460        // change script dust = 330
2461        // spendable sats = 3000 - 219 - 330 = 2451
2462        let tx = wallet.create_tx(
2463            Amount::from_sat(2452),
2464            recipient.clone().assume_checked().script_pubkey(),
2465            vec![],
2466            vec![(UTXOKey(OutPoint::null()), spendable.clone())],
2467            fee,
2468            &[0; 33],
2469            None,
2470        );
2471        assert_eq!(tx, Err(WalletOutputError::NotEnoughSpendableUTXO));
2472
2473        // successful tx creation
2474        let mut tx = wallet
2475            .create_tx(
2476                Amount::from_sat(1000),
2477                recipient.clone().assume_checked().script_pubkey(),
2478                vec![],
2479                vec![(UTXOKey(OutPoint::null()), spendable)],
2480                fee,
2481                &[0; 33],
2482                None,
2483            )
2484            .expect("is ok");
2485
2486        // peg out weight is incorrectly set to 0
2487        let res = StatelessWallet::validate_tx(&tx, &rbf(fee.sats_per_kvb, 0), fee, Bitcoin);
2488        assert_eq!(res, Err(WalletOutputError::TxWeightIncorrect(0, weight)));
2489
2490        // fee rate set below min relay fee to 0
2491        let res = StatelessWallet::validate_tx(&tx, &rbf(0, weight), fee, Bitcoin);
2492        assert_eq!(res, Err(WalletOutputError::BelowMinRelayFee));
2493
2494        // fees are okay
2495        let res = StatelessWallet::validate_tx(&tx, &rbf(fee.sats_per_kvb, weight), fee, Bitcoin);
2496        assert_eq!(res, Ok(()));
2497
2498        // tx has fee below consensus
2499        tx.fees = PegOutFees::new(0, weight);
2500        let res = StatelessWallet::validate_tx(&tx, &rbf(fee.sats_per_kvb, weight), fee, Bitcoin);
2501        assert_eq!(
2502            res,
2503            Err(WalletOutputError::PegOutFeeBelowConsensus(
2504                Feerate { sats_per_kvb: 0 },
2505                fee
2506            ))
2507        );
2508
2509        // tx has peg-out amount under dust limit
2510        tx.peg_out_amount = bitcoin::Amount::ZERO;
2511        let res = StatelessWallet::validate_tx(&tx, &rbf(fee.sats_per_kvb, weight), fee, Bitcoin);
2512        assert_eq!(res, Err(WalletOutputError::PegOutUnderDustLimit));
2513
2514        // tx is invalid for network
2515        let output = WalletOutputV0::PegOut(PegOut {
2516            recipient,
2517            amount: bitcoin::Amount::from_sat(1000),
2518            fees: PegOutFees::new(100, weight),
2519        });
2520        let res = StatelessWallet::validate_tx(&tx, &output, fee, Testnet);
2521        assert_eq!(
2522            res,
2523            Err(WalletOutputError::WrongNetwork(
2524                NetworkLegacyEncodingWrapper(Testnet),
2525                NetworkLegacyEncodingWrapper(Bitcoin)
2526            ))
2527        );
2528    }
2529
2530    fn rbf(sats_per_kvb: u64, total_weight: u64) -> WalletOutputV0 {
2531        WalletOutputV0::Rbf(Rbf {
2532            fees: PegOutFees::new(sats_per_kvb, total_weight),
2533            txid: Txid::all_zeros(),
2534        })
2535    }
2536}