Skip to main content

fedimint_wallet_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_possible_wrap)]
4#![allow(clippy::default_trait_access)]
5#![allow(clippy::missing_errors_doc)]
6#![allow(clippy::missing_panics_doc)]
7#![allow(clippy::module_name_repetitions)]
8#![allow(clippy::must_use_candidate)]
9#![allow(clippy::needless_lifetimes)]
10#![allow(clippy::too_many_lines)]
11
12pub mod db;
13pub mod envs;
14
15use std::clone::Clone;
16use std::cmp::min;
17use std::collections::{BTreeMap, BTreeSet, HashMap};
18use std::convert::Infallible;
19use std::sync::Arc;
20#[cfg(not(target_family = "wasm"))]
21use std::time::Duration;
22
23use anyhow::{Context, bail, ensure, format_err};
24use bitcoin::absolute::LockTime;
25use bitcoin::address::NetworkUnchecked;
26use bitcoin::ecdsa::Signature as EcdsaSig;
27use bitcoin::hashes::{Hash as BitcoinHash, HashEngine, Hmac, HmacEngine, sha256};
28use bitcoin::policy::DEFAULT_MIN_RELAY_TX_FEE;
29use bitcoin::psbt::{Input, Psbt};
30use bitcoin::secp256k1::{self, All, Message, Scalar, Secp256k1, Verification};
31use bitcoin::sighash::{EcdsaSighashType, SighashCache};
32use bitcoin::{Address, BlockHash, Network, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Txid};
33use common::config::WalletConfigConsensus;
34use common::{
35    DEPRECATED_RBF_ERROR, PegOutFees, PegOutSignatureItem, ProcessPegOutSigError, SpendableUTXO,
36    TxOutputSummary, WalletCommonInit, WalletConsensusItem, WalletInput, WalletModuleTypes,
37    WalletOutput, WalletOutputOutcome, WalletSummary, proprietary_tweak_key,
38};
39use db::{
40    BlockHashByHeightKey, BlockHashByHeightKeyPrefix, BlockHashByHeightValue, RecoveryItemKey,
41    RecoveryItemKeyPrefix,
42};
43use envs::get_feerate_multiplier;
44use fedimint_api_client::api::{DynModuleApi, FederationApiExt};
45use fedimint_core::config::{
46    ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
47    TypedServerModuleConsensusConfig,
48};
49use fedimint_core::core::ModuleInstanceId;
50use fedimint_core::db::{
51    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
52};
53use fedimint_core::encoding::btc::NetworkLegacyEncodingWrapper;
54use fedimint_core::encoding::{Decodable, Encodable};
55use fedimint_core::envs::{
56    BitcoinRpcConfig, FM_ENABLE_MODULE_WALLET_ENV, is_automatic_consensus_version_voting_disabled,
57    is_env_var_set_opt, is_rbf_withdrawal_enabled, is_running_in_test_env,
58};
59use fedimint_core::module::audit::Audit;
60use fedimint_core::module::{
61    Amounts, ApiEndpoint, ApiError, ApiRequestErased, ApiVersion, CORE_CONSENSUS_VERSION,
62    CoreConsensusVersion, InputMeta, ModuleConsensusVersion, ModuleInit,
63    SupportedModuleApiVersions, TransactionItemAmounts, api_endpoint,
64};
65use fedimint_core::net::auth::check_auth;
66use fedimint_core::task::TaskGroup;
67#[cfg(not(target_family = "wasm"))]
68use fedimint_core::task::sleep;
69use fedimint_core::util::{FmtCompact, FmtCompactAnyhow as _, backoff_util, retry};
70use fedimint_core::{
71    Feerate, InPoint, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
72    get_network_for_address, push_db_key_items, push_db_pair_items,
73};
74use fedimint_logging::LOG_MODULE_WALLET;
75use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
76use fedimint_server_core::config::{PeerHandleOps, PeerHandleOpsExt};
77use fedimint_server_core::migration::ServerModuleDbMigrationFn;
78use fedimint_server_core::{
79    ConfigGenModuleArgs, ServerModule, ServerModuleInit, ServerModuleInitArgs,
80};
81pub use fedimint_wallet_common as common;
82use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig, WalletConfig};
83use fedimint_wallet_common::endpoint_constants::{
84    ACTIVATE_CONSENSUS_VERSION_VOTING_ENDPOINT, BITCOIN_KIND_ENDPOINT, BITCOIN_RPC_CONFIG_ENDPOINT,
85    BLOCK_COUNT_ENDPOINT, BLOCK_COUNT_LOCAL_ENDPOINT, MODULE_CONSENSUS_VERSION_ENDPOINT,
86    PEG_OUT_FEES_ENDPOINT, RECOVERY_COUNT_ENDPOINT, RECOVERY_SLICE_ENDPOINT,
87    SUPPORTED_MODULE_CONSENSUS_VERSION_ENDPOINT, UTXO_CONFIRMED_ENDPOINT, WALLET_SUMMARY_ENDPOINT,
88};
89use fedimint_wallet_common::envs::FM_PORT_ESPLORA_ENV;
90use fedimint_wallet_common::keys::CompressedPublicKey;
91use fedimint_wallet_common::tweakable::Tweakable;
92use fedimint_wallet_common::{
93    MODULE_CONSENSUS_VERSION, Rbf, RecoveryItem, UnknownWalletInputVariantError, WalletInputError,
94    WalletOutputError, WalletOutputV0,
95};
96use futures::future::join_all;
97use futures::{FutureExt, StreamExt};
98use itertools::Itertools;
99use metrics::{
100    WALLET_INOUT_FEES_SATS, WALLET_INOUT_SATS, WALLET_PEGIN_FEES_SATS, WALLET_PEGIN_SATS,
101    WALLET_PEGOUT_FEES_SATS, WALLET_PEGOUT_SATS,
102};
103use miniscript::psbt::PsbtExt;
104use miniscript::{Descriptor, TranslatePk, translate_hash_fail};
105use rand::rngs::OsRng;
106use serde::Serialize;
107use strum::IntoEnumIterator;
108use tokio::sync::{Notify, watch};
109use tracing::{debug, info, instrument, trace, warn};
110
111use crate::db::{
112    BlockCountVoteKey, BlockCountVotePrefix, BlockHashKey, BlockHashKeyPrefix,
113    ClaimedPegInOutpointKey, ClaimedPegInOutpointPrefixKey, ConsensusVersionVoteKey,
114    ConsensusVersionVotePrefix, ConsensusVersionVotingActivationKey,
115    ConsensusVersionVotingActivationPrefix, DbKeyPrefix, FeeRateVoteKey, FeeRateVotePrefix,
116    PegOutBitcoinTransaction, PegOutBitcoinTransactionPrefix, PegOutNonceKey, PegOutTxSignatureCI,
117    PegOutTxSignatureCIPrefix, PendingTransactionKey, PendingTransactionPrefixKey, UTXOKey,
118    UTXOPrefixKey, UnsignedTransactionKey, UnsignedTransactionPrefixKey, UnspentTxOutKey,
119    UnspentTxOutPrefix, migrate_to_v1, migrate_to_v2,
120};
121use crate::metrics::WALLET_BLOCK_COUNT;
122
123mod metrics;
124
125#[derive(Debug, Clone)]
126pub struct WalletInit;
127
128impl ModuleInit for WalletInit {
129    type Common = WalletCommonInit;
130
131    async fn dump_database(
132        &self,
133        dbtx: &mut DatabaseTransaction<'_>,
134        prefix_names: Vec<String>,
135    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
136        let mut wallet: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
137        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
138            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
139        });
140        for table in filtered_prefixes {
141            match table {
142                DbKeyPrefix::BlockHash => {
143                    push_db_key_items!(dbtx, BlockHashKeyPrefix, BlockHashKey, wallet, "Blocks");
144                }
145                DbKeyPrefix::BlockHashByHeight => {
146                    push_db_key_items!(
147                        dbtx,
148                        BlockHashByHeightKeyPrefix,
149                        BlockHashByHeightKey,
150                        wallet,
151                        "Blocks by height"
152                    );
153                }
154                DbKeyPrefix::PegOutBitcoinOutPoint => {
155                    push_db_pair_items!(
156                        dbtx,
157                        PegOutBitcoinTransactionPrefix,
158                        PegOutBitcoinTransaction,
159                        WalletOutputOutcome,
160                        wallet,
161                        "Peg Out Bitcoin Transaction"
162                    );
163                }
164                DbKeyPrefix::PegOutTxSigCi => {
165                    push_db_pair_items!(
166                        dbtx,
167                        PegOutTxSignatureCIPrefix,
168                        PegOutTxSignatureCI,
169                        Vec<secp256k1::ecdsa::Signature>,
170                        wallet,
171                        "Peg Out Transaction Signatures"
172                    );
173                }
174                DbKeyPrefix::PendingTransaction => {
175                    push_db_pair_items!(
176                        dbtx,
177                        PendingTransactionPrefixKey,
178                        PendingTransactionKey,
179                        PendingTransaction,
180                        wallet,
181                        "Pending Transactions"
182                    );
183                }
184                DbKeyPrefix::PegOutNonce => {
185                    if let Some(nonce) = dbtx.get_value(&PegOutNonceKey).await {
186                        wallet.insert("Peg Out Nonce".to_string(), Box::new(nonce));
187                    }
188                }
189                DbKeyPrefix::UnsignedTransaction => {
190                    push_db_pair_items!(
191                        dbtx,
192                        UnsignedTransactionPrefixKey,
193                        UnsignedTransactionKey,
194                        UnsignedTransaction,
195                        wallet,
196                        "Unsigned Transactions"
197                    );
198                }
199                DbKeyPrefix::Utxo => {
200                    push_db_pair_items!(
201                        dbtx,
202                        UTXOPrefixKey,
203                        UTXOKey,
204                        SpendableUTXO,
205                        wallet,
206                        "UTXOs"
207                    );
208                }
209                DbKeyPrefix::BlockCountVote => {
210                    push_db_pair_items!(
211                        dbtx,
212                        BlockCountVotePrefix,
213                        BlockCountVoteKey,
214                        u32,
215                        wallet,
216                        "Block Count Votes"
217                    );
218                }
219                DbKeyPrefix::FeeRateVote => {
220                    push_db_pair_items!(
221                        dbtx,
222                        FeeRateVotePrefix,
223                        FeeRateVoteKey,
224                        Feerate,
225                        wallet,
226                        "Fee Rate Votes"
227                    );
228                }
229                DbKeyPrefix::ClaimedPegInOutpoint => {
230                    push_db_pair_items!(
231                        dbtx,
232                        ClaimedPegInOutpointPrefixKey,
233                        PeggedInOutpointKey,
234                        (),
235                        wallet,
236                        "Claimed Peg-in Outpoint"
237                    );
238                }
239                DbKeyPrefix::ConsensusVersionVote => {
240                    push_db_pair_items!(
241                        dbtx,
242                        ConsensusVersionVotePrefix,
243                        ConsensusVersionVoteKey,
244                        ModuleConsensusVersion,
245                        wallet,
246                        "Consensus Version Votes"
247                    );
248                }
249                DbKeyPrefix::UnspentTxOut => {
250                    push_db_pair_items!(
251                        dbtx,
252                        UnspentTxOutPrefix,
253                        UnspentTxOutKey,
254                        TxOut,
255                        wallet,
256                        "Consensus Version Votes"
257                    );
258                }
259                DbKeyPrefix::ConsensusVersionVotingActivation => {
260                    push_db_pair_items!(
261                        dbtx,
262                        ConsensusVersionVotingActivationPrefix,
263                        ConsensusVersionVotingActivationKey,
264                        (),
265                        wallet,
266                        "Consensus Version Voting Activation Key"
267                    );
268                }
269                DbKeyPrefix::RecoveryItem => {
270                    push_db_pair_items!(
271                        dbtx,
272                        RecoveryItemKeyPrefix,
273                        RecoveryItemKey,
274                        RecoveryItem,
275                        wallet,
276                        "Recovery Items"
277                    );
278                }
279            }
280        }
281
282        Box::new(wallet.into_iter())
283    }
284}
285
286/// Default finality delay based on network
287fn default_finality_delay(network: Network) -> u32 {
288    match network {
289        Network::Bitcoin | Network::Regtest => 10,
290        Network::Testnet | Network::Signet | Network::Testnet4 => 2,
291    }
292}
293
294/// Default Bitcoin RPC config for clients
295fn default_client_bitcoin_rpc(network: Network) -> BitcoinRpcConfig {
296    let url = match network {
297        Network::Bitcoin => "https://mempool.space/api/".to_string(),
298        Network::Testnet => "https://mempool.space/testnet/api/".to_string(),
299        Network::Testnet4 => "https://mempool.space/testnet4/api/".to_string(),
300        Network::Signet => "https://mutinynet.com/api/".to_string(),
301        Network::Regtest => format!(
302            "http://127.0.0.1:{}/",
303            std::env::var(FM_PORT_ESPLORA_ENV).unwrap_or_else(|_| String::from("50002"))
304        ),
305    };
306
307    BitcoinRpcConfig {
308        kind: "esplora".to_string(),
309        url: fedimint_core::util::SafeUrl::parse(&url).expect("hardcoded URL is valid"),
310    }
311}
312
313#[apply(async_trait_maybe_send!)]
314impl ServerModuleInit for WalletInit {
315    type Module = Wallet;
316
317    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
318        &[MODULE_CONSENSUS_VERSION]
319    }
320
321    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
322        SupportedModuleApiVersions::from_raw(
323            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
324            (
325                MODULE_CONSENSUS_VERSION.major,
326                MODULE_CONSENSUS_VERSION.minor,
327            ),
328            &[(0, 2)],
329        )
330    }
331
332    fn is_enabled_by_default(&self) -> bool {
333        is_env_var_set_opt(FM_ENABLE_MODULE_WALLET_ENV).unwrap_or(true)
334    }
335
336    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
337        for direction in ["incoming", "outgoing"] {
338            WALLET_INOUT_FEES_SATS
339                .with_label_values(&[direction])
340                .get_sample_count();
341            WALLET_INOUT_SATS
342                .with_label_values(&[direction])
343                .get_sample_count();
344        }
345        // Eagerly initialize metrics that trigger infrequently
346        WALLET_PEGIN_FEES_SATS.get_sample_count();
347        WALLET_PEGIN_SATS.get_sample_count();
348        WALLET_PEGOUT_SATS.get_sample_count();
349        WALLET_PEGOUT_FEES_SATS.get_sample_count();
350
351        Ok(Wallet::new(
352            args.cfg().to_typed()?,
353            args.db(),
354            args.task_group(),
355            args.our_peer_id(),
356            args.module_api().clone(),
357            args.server_bitcoin_rpc_monitor(),
358        )
359        .await?)
360    }
361
362    fn trusted_dealer_gen(
363        &self,
364        peers: &[PeerId],
365        args: &ConfigGenModuleArgs,
366    ) -> BTreeMap<PeerId, ServerModuleConfig> {
367        let secp = bitcoin::secp256k1::Secp256k1::new();
368        let finality_delay = default_finality_delay(args.network);
369        let client_default_bitcoin_rpc = default_client_bitcoin_rpc(args.network);
370
371        let btc_pegin_keys = peers
372            .iter()
373            .map(|&id| (id, secp.generate_keypair(&mut OsRng)))
374            .collect::<Vec<_>>();
375
376        let wallet_cfg: BTreeMap<PeerId, WalletConfig> = btc_pegin_keys
377            .iter()
378            .map(|(id, (sk, _))| {
379                let cfg = WalletConfig::new(
380                    btc_pegin_keys
381                        .iter()
382                        .map(|(peer_id, (_, pk))| (*peer_id, CompressedPublicKey { key: *pk }))
383                        .collect(),
384                    *sk,
385                    peers.to_num_peers().threshold(),
386                    args.network,
387                    finality_delay,
388                    client_default_bitcoin_rpc.clone(),
389                    FeeConsensus::default(),
390                );
391                (*id, cfg)
392            })
393            .collect();
394
395        wallet_cfg
396            .into_iter()
397            .map(|(k, v)| (k, v.to_erased()))
398            .collect()
399    }
400
401    async fn distributed_gen(
402        &self,
403        peers: &(dyn PeerHandleOps + Send + Sync),
404        args: &ConfigGenModuleArgs,
405    ) -> anyhow::Result<ServerModuleConfig> {
406        let secp = secp256k1::Secp256k1::new();
407        let (sk, pk) = secp.generate_keypair(&mut OsRng);
408        let our_key = CompressedPublicKey { key: pk };
409        let peer_peg_in_keys: BTreeMap<PeerId, CompressedPublicKey> = peers
410            .exchange_encodable(our_key.key)
411            .await?
412            .into_iter()
413            .map(|(k, key)| (k, CompressedPublicKey { key }))
414            .collect();
415
416        let finality_delay = default_finality_delay(args.network);
417        let client_default_bitcoin_rpc = default_client_bitcoin_rpc(args.network);
418
419        let wallet_cfg = WalletConfig::new(
420            peer_peg_in_keys,
421            sk,
422            peers.num_peers().threshold(),
423            args.network,
424            finality_delay,
425            client_default_bitcoin_rpc,
426            FeeConsensus::default(),
427        );
428
429        Ok(wallet_cfg.to_erased())
430    }
431
432    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
433        let config = config.to_typed::<WalletConfig>()?;
434        let pubkey = secp256k1::PublicKey::from_secret_key_global(&config.private.peg_in_key);
435
436        if config
437            .consensus
438            .peer_peg_in_keys
439            .get(identity)
440            .ok_or_else(|| format_err!("Secret key doesn't match any public key"))?
441            != &CompressedPublicKey::new(pubkey)
442        {
443            bail!(" Bitcoin wallet private key doesn't match multisig pubkey");
444        }
445
446        Ok(())
447    }
448
449    fn get_client_config(
450        &self,
451        config: &ServerModuleConsensusConfig,
452    ) -> anyhow::Result<WalletClientConfig> {
453        let config = WalletConfigConsensus::from_erased(config)?;
454        Ok(WalletClientConfig {
455            peg_in_descriptor: config.peg_in_descriptor,
456            network: config.network,
457            fee_consensus: config.fee_consensus,
458            finality_delay: config.finality_delay,
459            default_bitcoin_rpc: config.client_default_bitcoin_rpc,
460        })
461    }
462
463    /// DB migrations to move from old to newer versions
464    fn get_database_migrations(
465        &self,
466    ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Wallet>> {
467        let mut migrations: BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Wallet>> =
468            BTreeMap::new();
469        migrations.insert(
470            DatabaseVersion(0),
471            Box::new(|ctx| migrate_to_v1(ctx).boxed()),
472        );
473        migrations.insert(
474            DatabaseVersion(1),
475            Box::new(|ctx| migrate_to_v2(ctx).boxed()),
476        );
477        migrations
478    }
479
480    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
481        Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
482    }
483}
484
485#[apply(async_trait_maybe_send!)]
486impl ServerModule for Wallet {
487    type Common = WalletModuleTypes;
488    type Init = WalletInit;
489
490    async fn consensus_proposal<'a>(
491        &'a self,
492        dbtx: &mut DatabaseTransaction<'_>,
493    ) -> Vec<WalletConsensusItem> {
494        let mut items = dbtx
495            .find_by_prefix(&PegOutTxSignatureCIPrefix)
496            .await
497            .map(|(key, val)| {
498                WalletConsensusItem::PegOutSignature(PegOutSignatureItem {
499                    txid: key.0,
500                    signature: val,
501                })
502            })
503            .collect::<Vec<WalletConsensusItem>>()
504            .await;
505
506        // If we are unable to get a block count from the node we skip adding a block
507        // count vote to consensus items.
508        //
509        // The potential impact of not including the latest block count from our peer's
510        // node is delayed processing of change outputs for the federation, which is an
511        // acceptable risk since subsequent rounds of consensus will reattempt to fetch
512        // the latest block count.
513        match self.get_block_count() {
514            Ok(block_count) => {
515                let mut block_count_vote =
516                    block_count.saturating_sub(self.cfg.consensus.finality_delay);
517
518                let current_consensus_block_count = self.consensus_block_count(dbtx).await;
519
520                // This will prevent that more then five blocks are synced in a single database
521                // transaction if the federation was offline for a prolonged period of time.
522                if current_consensus_block_count != 0 {
523                    block_count_vote = min(
524                        block_count_vote,
525                        current_consensus_block_count
526                            + if is_running_in_test_env() {
527                                // We have some tests that mine *a lot* of blocks (empty)
528                                // and need them processed fast, so we raise the max.
529                                100
530                            } else {
531                                5
532                            },
533                    );
534                }
535
536                let current_vote = dbtx
537                    .get_value(&BlockCountVoteKey(self.our_peer_id))
538                    .await
539                    .unwrap_or(0);
540
541                trace!(
542                    target: LOG_MODULE_WALLET,
543                    ?current_vote,
544                    ?block_count_vote,
545                    ?block_count,
546                    ?current_consensus_block_count,
547                    "Proposing block count"
548                );
549
550                WALLET_BLOCK_COUNT.set(i64::from(block_count_vote));
551                items.push(WalletConsensusItem::BlockCount(block_count_vote));
552            }
553            Err(err) => {
554                warn!(target: LOG_MODULE_WALLET, err = %err.fmt_compact_anyhow(), "Can't update block count");
555            }
556        }
557
558        let fee_rate_proposal = self.get_fee_rate_opt();
559
560        items.push(WalletConsensusItem::Feerate(fee_rate_proposal));
561
562        // Consensus upgrade activation voting
563        let manual_vote = dbtx
564            .get_value(&ConsensusVersionVotingActivationKey)
565            .await
566            .map(|()| {
567                // TODO: allow voting on any version between the currently active and max
568                // supported one in case we support a too high one already
569                MODULE_CONSENSUS_VERSION
570            });
571
572        let active_consensus_version = self.consensus_module_consensus_version(dbtx).await;
573        let automatic_vote = if is_automatic_consensus_version_voting_disabled() {
574            None
575        } else {
576            self.peer_supported_consensus_version
577                .borrow()
578                .and_then(|supported_consensus_version| {
579                    // Only automatically vote if the commonly supported version is higher than the
580                    // currently active one
581                    (active_consensus_version < supported_consensus_version)
582                        .then_some(supported_consensus_version)
583                })
584        };
585
586        // Prioritizing automatic vote for now since the manual vote never resets. Once
587        // that is fixed this should be switched around.
588        if let Some(vote_version) = automatic_vote.or(manual_vote) {
589            items.push(WalletConsensusItem::ModuleConsensusVersion(vote_version));
590        }
591
592        items
593    }
594
595    async fn process_consensus_item<'a, 'b>(
596        &'a self,
597        dbtx: &mut DatabaseTransaction<'b>,
598        consensus_item: WalletConsensusItem,
599        peer: PeerId,
600    ) -> anyhow::Result<()> {
601        trace!(target: LOG_MODULE_WALLET, ?consensus_item, "Processing consensus item proposal");
602
603        match consensus_item {
604            WalletConsensusItem::BlockCount(block_count_vote) => {
605                let current_vote = dbtx.get_value(&BlockCountVoteKey(peer)).await.unwrap_or(0);
606
607                if block_count_vote < current_vote {
608                    warn!(target: LOG_MODULE_WALLET, ?peer, ?block_count_vote, "Block count vote is outdated");
609                }
610
611                ensure!(
612                    block_count_vote > current_vote,
613                    "Block count vote is redundant"
614                );
615
616                let old_consensus_block_count = self.consensus_block_count(dbtx).await;
617
618                dbtx.insert_entry(&BlockCountVoteKey(peer), &block_count_vote)
619                    .await;
620
621                let new_consensus_block_count = self.consensus_block_count(dbtx).await;
622
623                debug!(
624                    target: LOG_MODULE_WALLET,
625                    ?peer,
626                    ?current_vote,
627                    ?block_count_vote,
628                    ?old_consensus_block_count,
629                    ?new_consensus_block_count,
630                    "Received block count vote"
631                );
632
633                assert!(old_consensus_block_count <= new_consensus_block_count);
634
635                if new_consensus_block_count != old_consensus_block_count {
636                    // We do not sync blocks that predate the federation itself
637                    if old_consensus_block_count != 0 {
638                        self.sync_up_to_consensus_count(
639                            dbtx,
640                            old_consensus_block_count,
641                            new_consensus_block_count,
642                        )
643                        .await;
644                    } else {
645                        info!(
646                            target: LOG_MODULE_WALLET,
647                            ?old_consensus_block_count,
648                            ?new_consensus_block_count,
649                            "Not syncing up to consensus block count because we are at block 0"
650                        );
651                    }
652                }
653            }
654            WalletConsensusItem::Feerate(feerate) => {
655                if Some(feerate) == dbtx.insert_entry(&FeeRateVoteKey(peer), &feerate).await {
656                    bail!("Fee rate vote is redundant");
657                }
658            }
659            WalletConsensusItem::PegOutSignature(peg_out_signature) => {
660                let txid = peg_out_signature.txid;
661
662                if dbtx.get_value(&PendingTransactionKey(txid)).await.is_some() {
663                    bail!("Already received a threshold of valid signatures");
664                }
665
666                let mut unsigned = dbtx
667                    .get_value(&UnsignedTransactionKey(txid))
668                    .await
669                    .context("Unsigned transaction does not exist")?;
670
671                self.sign_peg_out_psbt(&mut unsigned.psbt, peer, &peg_out_signature)
672                    .context("Peg out signature is invalid")?;
673
674                dbtx.insert_entry(&UnsignedTransactionKey(txid), &unsigned)
675                    .await;
676
677                if let Ok(pending_tx) = self.finalize_peg_out_psbt(unsigned) {
678                    // We were able to finalize the transaction, so we will delete the
679                    // PSBT and instead keep the extracted tx for periodic transmission
680                    // as well as to accept the change into our wallet eventually once
681                    // it confirms.
682                    dbtx.insert_new_entry(&PendingTransactionKey(txid), &pending_tx)
683                        .await;
684
685                    dbtx.remove_entry(&PegOutTxSignatureCI(txid)).await;
686                    dbtx.remove_entry(&UnsignedTransactionKey(txid)).await;
687                    let broadcast_pending = self.broadcast_pending.clone();
688                    dbtx.on_commit(move || {
689                        broadcast_pending.notify_one();
690                    });
691                }
692            }
693            WalletConsensusItem::ModuleConsensusVersion(module_consensus_version) => {
694                let current_vote = dbtx
695                    .get_value(&ConsensusVersionVoteKey(peer))
696                    .await
697                    .unwrap_or(ModuleConsensusVersion::new(2, 0));
698
699                ensure!(
700                    module_consensus_version > current_vote,
701                    "Module consensus version vote is redundant"
702                );
703
704                dbtx.insert_entry(&ConsensusVersionVoteKey(peer), &module_consensus_version)
705                    .await;
706
707                assert!(
708                    self.consensus_module_consensus_version(dbtx).await <= MODULE_CONSENSUS_VERSION,
709                    "Wallet module does not support new consensus version, please upgrade the module"
710                );
711            }
712            WalletConsensusItem::Default { variant, .. } => {
713                panic!("Received wallet consensus item with unknown variant {variant}");
714            }
715        }
716
717        Ok(())
718    }
719
720    async fn process_input<'a, 'b, 'c>(
721        &'a self,
722        dbtx: &mut DatabaseTransaction<'c>,
723        input: &'b WalletInput,
724        _in_point: InPoint,
725    ) -> Result<InputMeta, WalletInputError> {
726        let (outpoint, tx_out, pub_key) = match input {
727            WalletInput::V0(input) => {
728                if !self.block_is_known(dbtx, input.proof_block()).await {
729                    return Err(WalletInputError::UnknownPegInProofBlock(
730                        input.proof_block(),
731                    ));
732                }
733
734                input.verify(&self.secp, &self.cfg.consensus.peg_in_descriptor)?;
735
736                debug!(target: LOG_MODULE_WALLET, outpoint = %input.outpoint(), "Claiming peg-in");
737
738                (input.0.outpoint(), input.tx_output(), input.tweak_key())
739            }
740            WalletInput::V1(input) => {
741                let input_tx_out = dbtx
742                    .get_value(&UnspentTxOutKey(input.outpoint))
743                    .await
744                    .ok_or(WalletInputError::UnknownUTXO)?;
745
746                if input_tx_out.script_pubkey
747                    != self
748                        .cfg
749                        .consensus
750                        .peg_in_descriptor
751                        .tweak(&input.tweak_key, secp256k1::SECP256K1)
752                        .script_pubkey()
753                {
754                    return Err(WalletInputError::WrongOutputScript);
755                }
756
757                // Verifying this is not strictly necessary for the server as the tx_out is only
758                // used in backup and recovery.
759                if input.tx_out != input_tx_out {
760                    return Err(WalletInputError::WrongTxOut);
761                }
762
763                (input.outpoint, input_tx_out, input.tweak_key)
764            }
765            WalletInput::Default { variant, .. } => {
766                return Err(WalletInputError::UnknownInputVariant(
767                    UnknownWalletInputVariantError { variant: *variant },
768                ));
769            }
770        };
771
772        if dbtx
773            .insert_entry(&ClaimedPegInOutpointKey(outpoint), &())
774            .await
775            .is_some()
776        {
777            return Err(WalletInputError::PegInAlreadyClaimed);
778        }
779
780        dbtx.insert_new_entry(
781            &UTXOKey(outpoint),
782            &SpendableUTXO {
783                tweak: pub_key.serialize(),
784                amount: tx_out.value,
785            },
786        )
787        .await;
788
789        let next_index = get_recovery_count(dbtx).await;
790        dbtx.insert_new_entry(
791            &RecoveryItemKey(next_index),
792            &RecoveryItem::Input {
793                outpoint,
794                script: tx_out.script_pubkey,
795            },
796        )
797        .await;
798
799        let amount = tx_out.value.into();
800
801        let fee = self.cfg.consensus.fee_consensus.peg_in_abs;
802
803        calculate_pegin_metrics(dbtx, amount, fee);
804
805        Ok(InputMeta {
806            amount: TransactionItemAmounts {
807                amounts: Amounts::new_bitcoin(amount),
808                fees: Amounts::new_bitcoin(fee),
809            },
810            pub_key,
811        })
812    }
813
814    async fn process_output<'a, 'b>(
815        &'a self,
816        dbtx: &mut DatabaseTransaction<'b>,
817        output: &'a WalletOutput,
818        out_point: OutPoint,
819    ) -> Result<TransactionItemAmounts, WalletOutputError> {
820        let output = output.ensure_v0_ref()?;
821
822        // In 0.4.0 we began preventing RBF withdrawals. Once we reach EoL support
823        // for 0.4.0, we can safely remove RBF withdrawal logic.
824        // see: https://github.com/fedimint/fedimint/issues/5453
825        if let WalletOutputV0::Rbf(_) = output {
826            // This exists as an escape hatch for any federations that successfully
827            // processed an RBF withdrawal due to having a single UTXO owned by the
828            // federation. If a peer needs to resync the federation's history, they can
829            // enable this variable until they've successfully synced, then restart with
830            // this disabled.
831            if is_rbf_withdrawal_enabled() {
832                warn!(target: LOG_MODULE_WALLET, "processing rbf withdrawal");
833            } else {
834                return Err(DEPRECATED_RBF_ERROR);
835            }
836        }
837
838        let change_tweak = self.consensus_nonce(dbtx).await;
839
840        let mut tx = self.create_peg_out_tx(dbtx, output, &change_tweak).await?;
841
842        let fee_rate = self.consensus_fee_rate(dbtx).await;
843
844        StatelessWallet::validate_tx(&tx, output, fee_rate, self.cfg.consensus.network.0)?;
845
846        self.offline_wallet().sign_psbt(&mut tx.psbt);
847
848        let txid = tx.psbt.unsigned_tx.compute_txid();
849
850        info!(
851            target: LOG_MODULE_WALLET,
852            %txid,
853            "Signing peg out",
854        );
855
856        let sigs = tx
857            .psbt
858            .inputs
859            .iter_mut()
860            .map(|input| {
861                assert_eq!(
862                    input.partial_sigs.len(),
863                    1,
864                    "There was already more than one (our) or no signatures in input"
865                );
866
867                // TODO: don't put sig into PSBT in the first place
868                // We actually take out our own signature so everyone finalizes the tx in the
869                // same epoch.
870                let sig = std::mem::take(&mut input.partial_sigs)
871                    .into_values()
872                    .next()
873                    .expect("asserted previously");
874
875                // We drop SIGHASH_ALL, because we always use that and it is only present in the
876                // PSBT for compatibility with other tools.
877                secp256k1::ecdsa::Signature::from_der(&sig.to_vec()[..sig.to_vec().len() - 1])
878                    .expect("we serialized it ourselves that way")
879            })
880            .collect::<Vec<_>>();
881
882        // Delete used UTXOs
883        for input in &tx.psbt.unsigned_tx.input {
884            dbtx.remove_entry(&UTXOKey(input.previous_output)).await;
885        }
886
887        dbtx.insert_new_entry(&UnsignedTransactionKey(txid), &tx)
888            .await;
889
890        dbtx.insert_new_entry(&PegOutTxSignatureCI(txid), &sigs)
891            .await;
892
893        dbtx.insert_new_entry(
894            &PegOutBitcoinTransaction(out_point),
895            &WalletOutputOutcome::new_v0(txid),
896        )
897        .await;
898        let amount: fedimint_core::Amount = output.amount().into();
899        let fee = self.cfg.consensus.fee_consensus.peg_out_abs;
900        calculate_pegout_metrics(dbtx, amount, fee);
901        Ok(TransactionItemAmounts {
902            amounts: Amounts::new_bitcoin(amount),
903            fees: Amounts::new_bitcoin(fee),
904        })
905    }
906
907    async fn output_status(
908        &self,
909        dbtx: &mut DatabaseTransaction<'_>,
910        out_point: OutPoint,
911    ) -> Option<WalletOutputOutcome> {
912        dbtx.get_value(&PegOutBitcoinTransaction(out_point)).await
913    }
914
915    async fn audit(
916        &self,
917        dbtx: &mut DatabaseTransaction<'_>,
918        audit: &mut Audit,
919        module_instance_id: ModuleInstanceId,
920    ) {
921        audit
922            .add_items(dbtx, module_instance_id, &UTXOPrefixKey, |_, v| {
923                v.amount.to_sat() as i64 * 1000
924            })
925            .await;
926        audit
927            .add_items(
928                dbtx,
929                module_instance_id,
930                &UnsignedTransactionPrefixKey,
931                |_, v| match v.rbf {
932                    None => v.change.to_sat() as i64 * 1000,
933                    Some(rbf) => rbf.fees.amount().to_sat() as i64 * -1000,
934                },
935            )
936            .await;
937        audit
938            .add_items(
939                dbtx,
940                module_instance_id,
941                &PendingTransactionPrefixKey,
942                |_, v| match v.rbf {
943                    None => v.change.to_sat() as i64 * 1000,
944                    Some(rbf) => rbf.fees.amount().to_sat() as i64 * -1000,
945                },
946            )
947            .await;
948    }
949
950    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
951        vec![
952            api_endpoint! {
953                BLOCK_COUNT_ENDPOINT,
954                ApiVersion::new(0, 0),
955                async |module: &Wallet, context, _params: ()| -> u32 {
956                    let db = context.db();
957                    let mut dbtx = db.begin_transaction_nc().await;
958                    Ok(module.consensus_block_count(&mut dbtx).await)
959                }
960            },
961            api_endpoint! {
962                BLOCK_COUNT_LOCAL_ENDPOINT,
963                ApiVersion::new(0, 0),
964                async |module: &Wallet, _context, _params: ()| -> Option<u32> {
965                    Ok(module.get_block_count().ok())
966                }
967            },
968            api_endpoint! {
969                PEG_OUT_FEES_ENDPOINT,
970                ApiVersion::new(0, 0),
971                async |module: &Wallet, context, params: (Address<NetworkUnchecked>, u64)| -> Option<PegOutFees> {
972                    let (address, sats) = params;
973                    let db = context.db();
974                    let mut dbtx = db.begin_transaction_nc().await;
975                    let feerate = module.consensus_fee_rate(&mut dbtx).await;
976
977                    // Since we are only calculating the tx size we can use an arbitrary dummy nonce.
978                    let dummy_tweak = [0; 33];
979
980                    let tx = module.offline_wallet().create_tx(
981                        bitcoin::Amount::from_sat(sats),
982                        // Note: While calling `assume_checked()` is generally unwise, it's fine
983                        // here since we're only returning a fee estimate, and we would still
984                        // reject a transaction with the wrong network upon attempted peg-out.
985                        address.assume_checked().script_pubkey(),
986                        vec![],
987                        module.available_utxos(&mut dbtx).await,
988                        feerate,
989                        &dummy_tweak,
990                        None
991                    );
992
993                    match tx {
994                        Err(error) => {
995                            // Usually from not enough spendable UTXOs
996                            warn!(target: LOG_MODULE_WALLET, "Error returning peg-out fees {error}");
997                            Ok(None)
998                        }
999                        Ok(tx) => Ok(Some(tx.fees))
1000                    }
1001                }
1002            },
1003            api_endpoint! {
1004                BITCOIN_KIND_ENDPOINT,
1005                ApiVersion::new(0, 1),
1006                async |module: &Wallet, _context, _params: ()| -> String {
1007                    Ok(module.btc_rpc.get_bitcoin_rpc_config().kind)
1008                }
1009            },
1010            api_endpoint! {
1011                BITCOIN_RPC_CONFIG_ENDPOINT,
1012                ApiVersion::new(0, 1),
1013                async |module: &Wallet, context, _params: ()| -> BitcoinRpcConfig {
1014                    check_auth(context)?;
1015                    let config = module.btc_rpc.get_bitcoin_rpc_config();
1016
1017                    // we need to remove auth, otherwise we'll send over the wire
1018                    let without_auth = config.url.clone().without_auth().map_err(|()| {
1019                        ApiError::server_error("Unable to remove auth from bitcoin config URL".to_string())
1020                    })?;
1021
1022                    Ok(BitcoinRpcConfig {
1023                        url: without_auth,
1024                        ..config
1025                    })
1026                }
1027            },
1028            api_endpoint! {
1029                WALLET_SUMMARY_ENDPOINT,
1030                ApiVersion::new(0, 1),
1031                async |module: &Wallet, context, _params: ()| -> WalletSummary {
1032                    let db = context.db();
1033                    let mut dbtx = db.begin_transaction_nc().await;
1034                    Ok(module.get_wallet_summary(&mut dbtx).await)
1035                }
1036            },
1037            api_endpoint! {
1038                MODULE_CONSENSUS_VERSION_ENDPOINT,
1039                ApiVersion::new(0, 2),
1040                async |module: &Wallet, context, _params: ()| -> ModuleConsensusVersion {
1041                    let db = context.db();
1042                    let mut dbtx = db.begin_transaction_nc().await;
1043                    Ok(module.consensus_module_consensus_version(&mut dbtx).await)
1044                }
1045            },
1046            api_endpoint! {
1047                SUPPORTED_MODULE_CONSENSUS_VERSION_ENDPOINT,
1048                ApiVersion::new(0, 2),
1049                async |_module: &Wallet, _context, _params: ()| -> ModuleConsensusVersion {
1050                    Ok(MODULE_CONSENSUS_VERSION)
1051                }
1052            },
1053            api_endpoint! {
1054                ACTIVATE_CONSENSUS_VERSION_VOTING_ENDPOINT,
1055                ApiVersion::new(0, 2),
1056                async |_module: &Wallet, context, _params: ()| -> () {
1057                    check_auth(context)?;
1058
1059                    let db = context.db();
1060                    let mut dbtx = db.begin_transaction().await;
1061                    dbtx.to_ref().insert_entry(&ConsensusVersionVotingActivationKey, &()).await;
1062                    dbtx.commit_tx_result().await?;
1063                    Ok(())
1064                }
1065            },
1066            api_endpoint! {
1067                UTXO_CONFIRMED_ENDPOINT,
1068                ApiVersion::new(0, 2),
1069                async |module: &Wallet, context, outpoint: bitcoin::OutPoint| -> bool {
1070                    let db = context.db();
1071                    let mut dbtx = db.begin_transaction_nc().await;
1072                    Ok(module.is_utxo_confirmed(&mut dbtx, outpoint).await)
1073                }
1074            },
1075            api_endpoint! {
1076                RECOVERY_COUNT_ENDPOINT,
1077                ApiVersion::new(0, 1),
1078                async |_module: &Wallet, context, _params: ()| -> u64 {
1079                    let db = context.db();
1080                    let mut dbtx = db.begin_transaction_nc().await;
1081                    Ok(get_recovery_count(&mut dbtx).await)
1082                }
1083            },
1084            api_endpoint! {
1085                RECOVERY_SLICE_ENDPOINT,
1086                ApiVersion::new(0, 1),
1087                async |_module: &Wallet, context, range: (u64, u64)| -> Vec<RecoveryItem> {
1088                    let db = context.db();
1089                    let mut dbtx = db.begin_transaction_nc().await;
1090                    Ok(get_recovery_slice(&mut dbtx, range).await)
1091                }
1092            },
1093        ]
1094    }
1095}
1096
1097async fn get_recovery_count(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
1098    dbtx.find_by_prefix_sorted_descending(&RecoveryItemKeyPrefix)
1099        .await
1100        .next()
1101        .await
1102        .map_or(0, |entry| entry.0.0 + 1)
1103}
1104
1105async fn get_recovery_slice(
1106    dbtx: &mut DatabaseTransaction<'_>,
1107    range: (u64, u64),
1108) -> Vec<RecoveryItem> {
1109    dbtx.find_by_range(RecoveryItemKey(range.0)..RecoveryItemKey(range.1))
1110        .await
1111        .map(|entry| entry.1)
1112        .collect()
1113        .await
1114}
1115
1116fn calculate_pegin_metrics(
1117    dbtx: &mut DatabaseTransaction<'_>,
1118    amount: fedimint_core::Amount,
1119    fee: fedimint_core::Amount,
1120) {
1121    dbtx.on_commit(move || {
1122        WALLET_INOUT_SATS
1123            .with_label_values(&["incoming"])
1124            .observe(amount.sats_f64());
1125        WALLET_INOUT_FEES_SATS
1126            .with_label_values(&["incoming"])
1127            .observe(fee.sats_f64());
1128        WALLET_PEGIN_SATS.observe(amount.sats_f64());
1129        WALLET_PEGIN_FEES_SATS.observe(fee.sats_f64());
1130    });
1131}
1132
1133fn calculate_pegout_metrics(
1134    dbtx: &mut DatabaseTransaction<'_>,
1135    amount: fedimint_core::Amount,
1136    fee: fedimint_core::Amount,
1137) {
1138    dbtx.on_commit(move || {
1139        WALLET_INOUT_SATS
1140            .with_label_values(&["outgoing"])
1141            .observe(amount.sats_f64());
1142        WALLET_INOUT_FEES_SATS
1143            .with_label_values(&["outgoing"])
1144            .observe(fee.sats_f64());
1145        WALLET_PEGOUT_SATS.observe(amount.sats_f64());
1146        WALLET_PEGOUT_FEES_SATS.observe(fee.sats_f64());
1147    });
1148}
1149
1150#[derive(Debug)]
1151pub struct Wallet {
1152    cfg: WalletConfig,
1153    db: Database,
1154    secp: Secp256k1<All>,
1155    btc_rpc: ServerBitcoinRpcMonitor,
1156    our_peer_id: PeerId,
1157    /// Broadcasting pending txes can be triggered immediately with this
1158    broadcast_pending: Arc<Notify>,
1159    task_group: TaskGroup,
1160    /// Maximum consensus version supported by *all* our peers. Used to
1161    /// automatically activate new consensus versions as soon as everyone
1162    /// upgrades.
1163    peer_supported_consensus_version: watch::Receiver<Option<ModuleConsensusVersion>>,
1164}
1165
1166impl Wallet {
1167    pub async fn new(
1168        cfg: WalletConfig,
1169        db: &Database,
1170        task_group: &TaskGroup,
1171        our_peer_id: PeerId,
1172        module_api: DynModuleApi,
1173        server_bitcoin_rpc_monitor: ServerBitcoinRpcMonitor,
1174    ) -> anyhow::Result<Wallet> {
1175        let broadcast_pending = Arc::new(Notify::new());
1176        Self::spawn_broadcast_pending_task(
1177            task_group,
1178            &server_bitcoin_rpc_monitor,
1179            db,
1180            broadcast_pending.clone(),
1181        );
1182
1183        let peer_supported_consensus_version =
1184            Self::spawn_peer_supported_consensus_version_task(module_api, task_group, our_peer_id);
1185
1186        let status = retry("verify network", backoff_util::aggressive_backoff(), || {
1187            std::future::ready(
1188                server_bitcoin_rpc_monitor
1189                    .status()
1190                    .context("No connection to bitcoin rpc"),
1191            )
1192        })
1193        .await?;
1194
1195        ensure!(status.network == cfg.consensus.network.0, "Wrong Network");
1196
1197        let wallet = Wallet {
1198            cfg,
1199            db: db.clone(),
1200            secp: Default::default(),
1201            btc_rpc: server_bitcoin_rpc_monitor,
1202            our_peer_id,
1203            task_group: task_group.clone(),
1204            peer_supported_consensus_version,
1205            broadcast_pending,
1206        };
1207
1208        Ok(wallet)
1209    }
1210
1211    /// Try to attach signatures to a pending peg-out tx.
1212    fn sign_peg_out_psbt(
1213        &self,
1214        psbt: &mut Psbt,
1215        peer: PeerId,
1216        signature: &PegOutSignatureItem,
1217    ) -> Result<(), ProcessPegOutSigError> {
1218        let peer_key = self
1219            .cfg
1220            .consensus
1221            .peer_peg_in_keys
1222            .get(&peer)
1223            .expect("always called with valid peer id");
1224
1225        if psbt.inputs.len() != signature.signature.len() {
1226            return Err(ProcessPegOutSigError::WrongSignatureCount(
1227                psbt.inputs.len(),
1228                signature.signature.len(),
1229            ));
1230        }
1231
1232        let mut tx_hasher = SighashCache::new(&psbt.unsigned_tx);
1233        for (idx, (input, signature)) in psbt
1234            .inputs
1235            .iter_mut()
1236            .zip(signature.signature.iter())
1237            .enumerate()
1238        {
1239            let tx_hash = tx_hasher
1240                .p2wsh_signature_hash(
1241                    idx,
1242                    input
1243                        .witness_script
1244                        .as_ref()
1245                        .expect("Missing witness script"),
1246                    input.witness_utxo.as_ref().expect("Missing UTXO").value,
1247                    EcdsaSighashType::All,
1248                )
1249                .map_err(|_| ProcessPegOutSigError::SighashError)?;
1250
1251            let tweak = input
1252                .proprietary
1253                .get(&proprietary_tweak_key())
1254                .expect("we saved it with a tweak");
1255
1256            let tweaked_peer_key = peer_key.tweak(tweak, &self.secp);
1257            self.secp
1258                .verify_ecdsa(
1259                    &Message::from_digest_slice(&tx_hash[..]).unwrap(),
1260                    signature,
1261                    &tweaked_peer_key.key,
1262                )
1263                .map_err(|_| ProcessPegOutSigError::InvalidSignature)?;
1264
1265            if input
1266                .partial_sigs
1267                .insert(tweaked_peer_key.into(), EcdsaSig::sighash_all(*signature))
1268                .is_some()
1269            {
1270                // Should never happen since peers only sign a PSBT once
1271                return Err(ProcessPegOutSigError::DuplicateSignature);
1272            }
1273        }
1274        Ok(())
1275    }
1276
1277    fn finalize_peg_out_psbt(
1278        &self,
1279        mut unsigned: UnsignedTransaction,
1280    ) -> Result<PendingTransaction, ProcessPegOutSigError> {
1281        // We need to save the change output's tweak key to be able to access the funds
1282        // later on. The tweak is extracted here because the psbt is moved next
1283        // and not available anymore when the tweak is actually needed in the
1284        // end to be put into the batch on success.
1285        let change_tweak: [u8; 33] = unsigned
1286            .psbt
1287            .outputs
1288            .iter()
1289            .find_map(|output| output.proprietary.get(&proprietary_tweak_key()).cloned())
1290            .ok_or(ProcessPegOutSigError::MissingOrMalformedChangeTweak)?
1291            .try_into()
1292            .map_err(|_| ProcessPegOutSigError::MissingOrMalformedChangeTweak)?;
1293
1294        if let Err(error) = unsigned.psbt.finalize_mut(&self.secp) {
1295            return Err(ProcessPegOutSigError::ErrorFinalizingPsbt(error));
1296        }
1297
1298        let tx = unsigned.psbt.clone().extract_tx_unchecked_fee_rate();
1299
1300        Ok(PendingTransaction {
1301            tx,
1302            tweak: change_tweak,
1303            change: unsigned.change,
1304            destination: unsigned.destination,
1305            fees: unsigned.fees,
1306            selected_utxos: unsigned.selected_utxos,
1307            peg_out_amount: unsigned.peg_out_amount,
1308            rbf: unsigned.rbf,
1309        })
1310    }
1311
1312    fn get_block_count(&self) -> anyhow::Result<u32> {
1313        self.btc_rpc
1314            .status()
1315            .context("No bitcoin rpc connection")
1316            .and_then(|status| {
1317                status
1318                    .block_count
1319                    .try_into()
1320                    .map_err(|_| format_err!("Block count exceeds u32 limits"))
1321            })
1322    }
1323
1324    pub fn get_fee_rate_opt(&self) -> Feerate {
1325        // `get_feerate_multiplier` is clamped and can't be negative
1326        // feerate sources as clamped and can't be negative or too large
1327        #[allow(clippy::cast_precision_loss)]
1328        #[allow(clippy::cast_sign_loss)]
1329        Feerate {
1330            sats_per_kvb: ((self
1331                .btc_rpc
1332                .status()
1333                .map_or(self.cfg.consensus.default_fee, |status| status.fee_rate)
1334                .sats_per_kvb as f64
1335                * get_feerate_multiplier())
1336            .round()) as u64,
1337        }
1338    }
1339
1340    pub async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u32 {
1341        let peer_count = self.cfg.consensus.peer_peg_in_keys.to_num_peers().total();
1342
1343        let mut counts = dbtx
1344            .find_by_prefix(&BlockCountVotePrefix)
1345            .await
1346            .map(|entry| entry.1)
1347            .collect::<Vec<u32>>()
1348            .await;
1349
1350        assert!(counts.len() <= peer_count);
1351
1352        while counts.len() < peer_count {
1353            counts.push(0);
1354        }
1355
1356        counts.sort_unstable();
1357
1358        counts[peer_count / 2]
1359    }
1360
1361    pub async fn consensus_fee_rate(&self, dbtx: &mut DatabaseTransaction<'_>) -> Feerate {
1362        let peer_count = self.cfg.consensus.peer_peg_in_keys.to_num_peers().total();
1363
1364        let mut rates = dbtx
1365            .find_by_prefix(&FeeRateVotePrefix)
1366            .await
1367            .map(|(.., rate)| rate)
1368            .collect::<Vec<_>>()
1369            .await;
1370
1371        assert!(rates.len() <= peer_count);
1372
1373        while rates.len() < peer_count {
1374            rates.push(self.cfg.consensus.default_fee);
1375        }
1376
1377        rates.sort_unstable();
1378
1379        rates[peer_count / 2]
1380    }
1381
1382    async fn consensus_module_consensus_version(
1383        &self,
1384        dbtx: &mut DatabaseTransaction<'_>,
1385    ) -> ModuleConsensusVersion {
1386        let num_peers = self.cfg.consensus.peer_peg_in_keys.to_num_peers();
1387
1388        let mut versions = dbtx
1389            .find_by_prefix(&ConsensusVersionVotePrefix)
1390            .await
1391            .map(|entry| entry.1)
1392            .collect::<Vec<ModuleConsensusVersion>>()
1393            .await;
1394
1395        while versions.len() < num_peers.total() {
1396            versions.push(ModuleConsensusVersion::new(2, 0));
1397        }
1398
1399        assert_eq!(versions.len(), num_peers.total());
1400
1401        versions.sort_unstable();
1402
1403        assert!(versions.first() <= versions.last());
1404
1405        versions[num_peers.max_evil()]
1406    }
1407
1408    pub async fn consensus_nonce(&self, dbtx: &mut DatabaseTransaction<'_>) -> [u8; 33] {
1409        let nonce_idx = dbtx.get_value(&PegOutNonceKey).await.unwrap_or(0);
1410        dbtx.insert_entry(&PegOutNonceKey, &(nonce_idx + 1)).await;
1411
1412        nonce_from_idx(nonce_idx)
1413    }
1414
1415    async fn sync_up_to_consensus_count(
1416        &self,
1417        dbtx: &mut DatabaseTransaction<'_>,
1418        old_count: u32,
1419        new_count: u32,
1420    ) {
1421        info!(
1422            target: LOG_MODULE_WALLET,
1423            old_count,
1424            new_count,
1425            blocks_to_go = new_count - old_count,
1426            "New block count consensus, initiating sync",
1427        );
1428
1429        // Before we can safely call our bitcoin backend to process the new consensus
1430        // count, we need to ensure we observed enough confirmations
1431        self.wait_for_finality_confs_or_shutdown(new_count).await;
1432
1433        for height in old_count..new_count {
1434            info!(
1435                target: LOG_MODULE_WALLET,
1436                height,
1437                "Processing block of height {height}",
1438            );
1439
1440            // TODO: use batching for mainnet syncing
1441            trace!(block = height, "Fetching block hash");
1442            let block_hash = retry("get_block_hash", backoff_util::background_backoff(), || {
1443                self.btc_rpc.get_block_hash(u64::from(height)) // TODO: use u64 for height everywhere
1444            })
1445            .await
1446            .expect("bitcoind rpc to get block hash");
1447
1448            let block = retry("get_block", backoff_util::background_backoff(), || {
1449                self.btc_rpc.get_block(&block_hash)
1450            })
1451            .await
1452            .expect("bitcoind rpc to get block");
1453
1454            if let Some(prev_block_height) = height.checked_sub(1) {
1455                if let Some(hash) = dbtx
1456                    .get_value(&BlockHashByHeightKey(prev_block_height))
1457                    .await
1458                {
1459                    assert_eq!(block.header.prev_blockhash, hash.0);
1460                } else {
1461                    warn!(
1462                        target: LOG_MODULE_WALLET,
1463                        %height,
1464                        %block_hash,
1465                        %prev_block_height,
1466                        prev_blockhash = %block.header.prev_blockhash,
1467                        "Missing previous block hash. This should only happen on the first processed block height."
1468                    );
1469                }
1470            }
1471
1472            if self.consensus_module_consensus_version(dbtx).await
1473                >= ModuleConsensusVersion::new(2, 2)
1474            {
1475                for transaction in &block.txdata {
1476                    // We maintain the subset of unspent P2WSH transaction outputs created
1477                    // since the module was running on the new consensus version, which might be
1478                    // the same time as the genesis session.
1479
1480                    for tx_in in &transaction.input {
1481                        dbtx.remove_entry(&UnspentTxOutKey(tx_in.previous_output))
1482                            .await;
1483                    }
1484
1485                    for (vout, tx_out) in transaction.output.iter().enumerate() {
1486                        let should_track_utxo = if self.cfg.consensus.peer_peg_in_keys.len() > 1 {
1487                            tx_out.script_pubkey.is_p2wsh()
1488                        } else {
1489                            tx_out.script_pubkey.is_p2wpkh()
1490                        };
1491
1492                        if should_track_utxo {
1493                            let outpoint = bitcoin::OutPoint {
1494                                txid: transaction.compute_txid(),
1495                                vout: vout as u32,
1496                            };
1497
1498                            dbtx.insert_new_entry(&UnspentTxOutKey(outpoint), tx_out)
1499                                .await;
1500                        }
1501                    }
1502                }
1503            }
1504
1505            let pending_transactions = dbtx
1506                .find_by_prefix(&PendingTransactionPrefixKey)
1507                .await
1508                .map(|(key, transaction)| (key.0, transaction))
1509                .collect::<HashMap<Txid, PendingTransaction>>()
1510                .await;
1511            let pending_transactions_len = pending_transactions.len();
1512
1513            debug!(
1514                target: LOG_MODULE_WALLET,
1515                ?height,
1516                ?pending_transactions_len,
1517                "Recognizing change UTXOs"
1518            );
1519            for (txid, tx) in &pending_transactions {
1520                let is_tx_in_block = block.txdata.iter().any(|tx| tx.compute_txid() == *txid);
1521
1522                if is_tx_in_block {
1523                    debug!(
1524                        target: LOG_MODULE_WALLET,
1525                        ?txid, ?height, ?block_hash, "Recognizing change UTXO"
1526                    );
1527                    self.recognize_change_utxo(dbtx, tx).await;
1528                } else {
1529                    debug!(
1530                        target: LOG_MODULE_WALLET,
1531                        ?txid,
1532                        ?height,
1533                        ?block_hash,
1534                        "Pending transaction not yet confirmed in this block"
1535                    );
1536                }
1537            }
1538
1539            dbtx.insert_new_entry(&BlockHashKey(block_hash), &()).await;
1540            dbtx.insert_new_entry(
1541                &BlockHashByHeightKey(height),
1542                &BlockHashByHeightValue(block_hash),
1543            )
1544            .await;
1545
1546            info!(
1547                target: LOG_MODULE_WALLET,
1548                height,
1549                ?block_hash,
1550                "Successfully processed block of height {height}",
1551            );
1552        }
1553    }
1554
1555    /// Add a change UTXO to our spendable UTXO database after it was included
1556    /// in a block that we got consensus on.
1557    async fn recognize_change_utxo(
1558        &self,
1559        dbtx: &mut DatabaseTransaction<'_>,
1560        pending_tx: &PendingTransaction,
1561    ) {
1562        self.remove_rbf_transactions(dbtx, pending_tx).await;
1563
1564        let script_pk = self
1565            .cfg
1566            .consensus
1567            .peg_in_descriptor
1568            .tweak(&pending_tx.tweak, &self.secp)
1569            .script_pubkey();
1570        for (idx, output) in pending_tx.tx.output.iter().enumerate() {
1571            if output.script_pubkey == script_pk {
1572                dbtx.insert_entry(
1573                    &UTXOKey(bitcoin::OutPoint {
1574                        txid: pending_tx.tx.compute_txid(),
1575                        vout: idx as u32,
1576                    }),
1577                    &SpendableUTXO {
1578                        tweak: pending_tx.tweak,
1579                        amount: output.value,
1580                    },
1581                )
1582                .await;
1583            }
1584        }
1585    }
1586
1587    /// Removes the `PendingTransaction` and any transactions tied to it via RBF
1588    async fn remove_rbf_transactions(
1589        &self,
1590        dbtx: &mut DatabaseTransaction<'_>,
1591        pending_tx: &PendingTransaction,
1592    ) {
1593        let mut all_transactions: BTreeMap<Txid, PendingTransaction> = dbtx
1594            .find_by_prefix(&PendingTransactionPrefixKey)
1595            .await
1596            .map(|(key, val)| (key.0, val))
1597            .collect::<BTreeMap<Txid, PendingTransaction>>()
1598            .await;
1599
1600        // We need to search and remove all `PendingTransactions` invalidated by RBF
1601        let mut pending_to_remove = vec![pending_tx.clone()];
1602        while let Some(removed) = pending_to_remove.pop() {
1603            all_transactions.remove(&removed.tx.compute_txid());
1604            dbtx.remove_entry(&PendingTransactionKey(removed.tx.compute_txid()))
1605                .await;
1606
1607            // Search for tx that this `removed` has as RBF
1608            if let Some(rbf) = &removed.rbf
1609                && let Some(tx) = all_transactions.get(&rbf.txid)
1610            {
1611                pending_to_remove.push(tx.clone());
1612            }
1613
1614            // Search for tx that wanted to RBF the `removed` one
1615            for tx in all_transactions.values() {
1616                if let Some(rbf) = &tx.rbf
1617                    && rbf.txid == removed.tx.compute_txid()
1618                {
1619                    pending_to_remove.push(tx.clone());
1620                }
1621            }
1622        }
1623    }
1624
1625    async fn block_is_known(
1626        &self,
1627        dbtx: &mut DatabaseTransaction<'_>,
1628        block_hash: BlockHash,
1629    ) -> bool {
1630        dbtx.get_value(&BlockHashKey(block_hash)).await.is_some()
1631    }
1632
1633    async fn create_peg_out_tx(
1634        &self,
1635        dbtx: &mut DatabaseTransaction<'_>,
1636        output: &WalletOutputV0,
1637        change_tweak: &[u8; 33],
1638    ) -> Result<UnsignedTransaction, WalletOutputError> {
1639        match output {
1640            WalletOutputV0::PegOut(peg_out) => self.offline_wallet().create_tx(
1641                peg_out.amount,
1642                // Note: While calling `assume_checked()` is generally unwise, checking the
1643                // network here could be a consensus-breaking change. Ignoring the network
1644                // is fine here since we validate it in `process_output()`.
1645                peg_out.recipient.clone().assume_checked().script_pubkey(),
1646                vec![],
1647                self.available_utxos(dbtx).await,
1648                peg_out.fees.fee_rate,
1649                change_tweak,
1650                None,
1651            ),
1652            WalletOutputV0::Rbf(rbf) => {
1653                let tx = dbtx
1654                    .get_value(&PendingTransactionKey(rbf.txid))
1655                    .await
1656                    .ok_or(WalletOutputError::RbfTransactionIdNotFound)?;
1657
1658                self.offline_wallet().create_tx(
1659                    tx.peg_out_amount,
1660                    tx.destination,
1661                    tx.selected_utxos,
1662                    self.available_utxos(dbtx).await,
1663                    tx.fees.fee_rate,
1664                    change_tweak,
1665                    Some(rbf.clone()),
1666                )
1667            }
1668        }
1669    }
1670
1671    async fn available_utxos(
1672        &self,
1673        dbtx: &mut DatabaseTransaction<'_>,
1674    ) -> Vec<(UTXOKey, SpendableUTXO)> {
1675        dbtx.find_by_prefix(&UTXOPrefixKey)
1676            .await
1677            .collect::<Vec<(UTXOKey, SpendableUTXO)>>()
1678            .await
1679    }
1680
1681    pub async fn get_wallet_value(&self, dbtx: &mut DatabaseTransaction<'_>) -> bitcoin::Amount {
1682        let sat_sum = self
1683            .available_utxos(dbtx)
1684            .await
1685            .into_iter()
1686            .map(|(_, utxo)| utxo.amount.to_sat())
1687            .sum();
1688        bitcoin::Amount::from_sat(sat_sum)
1689    }
1690
1691    async fn get_wallet_summary(&self, dbtx: &mut DatabaseTransaction<'_>) -> WalletSummary {
1692        fn partition_peg_out_and_change(
1693            transactions: Vec<Transaction>,
1694        ) -> (Vec<TxOutputSummary>, Vec<TxOutputSummary>) {
1695            let mut peg_out_txos: Vec<TxOutputSummary> = Vec::new();
1696            let mut change_utxos: Vec<TxOutputSummary> = Vec::new();
1697
1698            for tx in transactions {
1699                let txid = tx.compute_txid();
1700
1701                // to identify outputs for the peg_out (idx = 0) and change (idx = 1), we lean
1702                // on how the wallet constructs the transaction
1703                let peg_out_output = tx
1704                    .output
1705                    .first()
1706                    .expect("tx must contain withdrawal output");
1707
1708                let change_output = tx.output.last().expect("tx must contain change output");
1709
1710                peg_out_txos.push(TxOutputSummary {
1711                    outpoint: bitcoin::OutPoint { txid, vout: 0 },
1712                    amount: peg_out_output.value,
1713                });
1714
1715                change_utxos.push(TxOutputSummary {
1716                    outpoint: bitcoin::OutPoint { txid, vout: 1 },
1717                    amount: change_output.value,
1718                });
1719            }
1720
1721            (peg_out_txos, change_utxos)
1722        }
1723
1724        let spendable_utxos = self
1725            .available_utxos(dbtx)
1726            .await
1727            .iter()
1728            .map(|(utxo_key, spendable_utxo)| TxOutputSummary {
1729                outpoint: utxo_key.0,
1730                amount: spendable_utxo.amount,
1731            })
1732            .collect::<Vec<_>>();
1733
1734        // constructed peg-outs without threshold signatures
1735        let unsigned_transactions = dbtx
1736            .find_by_prefix(&UnsignedTransactionPrefixKey)
1737            .await
1738            .map(|(_tx_key, tx)| tx.psbt.unsigned_tx)
1739            .collect::<Vec<_>>()
1740            .await;
1741
1742        // peg-outs with threshold signatures, awaiting finality delay confirmations
1743        let unconfirmed_transactions = dbtx
1744            .find_by_prefix(&PendingTransactionPrefixKey)
1745            .await
1746            .map(|(_tx_key, tx)| tx.tx)
1747            .collect::<Vec<_>>()
1748            .await;
1749
1750        let (unsigned_peg_out_txos, unsigned_change_utxos) =
1751            partition_peg_out_and_change(unsigned_transactions);
1752
1753        let (unconfirmed_peg_out_txos, unconfirmed_change_utxos) =
1754            partition_peg_out_and_change(unconfirmed_transactions);
1755
1756        WalletSummary {
1757            spendable_utxos,
1758            unsigned_peg_out_txos,
1759            unsigned_change_utxos,
1760            unconfirmed_peg_out_txos,
1761            unconfirmed_change_utxos,
1762        }
1763    }
1764
1765    async fn is_utxo_confirmed(
1766        &self,
1767        dbtx: &mut DatabaseTransaction<'_>,
1768        outpoint: bitcoin::OutPoint,
1769    ) -> bool {
1770        dbtx.get_value(&UnspentTxOutKey(outpoint)).await.is_some()
1771    }
1772
1773    fn offline_wallet(&'_ self) -> StatelessWallet<'_> {
1774        StatelessWallet {
1775            descriptor: &self.cfg.consensus.peg_in_descriptor,
1776            secret_key: &self.cfg.private.peg_in_key,
1777            secp: &self.secp,
1778        }
1779    }
1780
1781    fn spawn_broadcast_pending_task(
1782        task_group: &TaskGroup,
1783        server_bitcoin_rpc_monitor: &ServerBitcoinRpcMonitor,
1784        db: &Database,
1785        broadcast_pending_notify: Arc<Notify>,
1786    ) {
1787        task_group.spawn_cancellable("broadcast pending", {
1788            let btc_rpc = server_bitcoin_rpc_monitor.clone();
1789            let db = db.clone();
1790            run_broadcast_pending_tx(db, btc_rpc, broadcast_pending_notify)
1791        });
1792    }
1793
1794    /// Get the bitcoin network for UI display
1795    pub fn network_ui(&self) -> Network {
1796        self.cfg.consensus.network.0
1797    }
1798
1799    /// Get the current consensus block count for UI display
1800    pub async fn consensus_block_count_ui(&self) -> u32 {
1801        self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
1802            .await
1803    }
1804
1805    /// Get the current consensus fee rate for UI display
1806    pub async fn consensus_feerate_ui(&self) -> Feerate {
1807        self.consensus_fee_rate(&mut self.db.begin_transaction_nc().await)
1808            .await
1809    }
1810
1811    /// Get the current wallet summary for UI display
1812    pub async fn get_wallet_summary_ui(&self) -> WalletSummary {
1813        self.get_wallet_summary(&mut self.db.begin_transaction_nc().await)
1814            .await
1815    }
1816
1817    /// Shutdown the task group shared throughout fedimintd, giving 60 seconds
1818    /// for other services to gracefully shutdown.
1819    async fn graceful_shutdown(&self) {
1820        if let Err(e) = self
1821            .task_group
1822            .clone()
1823            .shutdown_join_all(Some(Duration::from_mins(1)))
1824            .await
1825        {
1826            panic!("Error while shutting down fedimintd task group: {e}");
1827        }
1828    }
1829
1830    /// Returns once our bitcoin backend observes finality delay confirmations
1831    /// of the consensus block count. If we don't observe enough confirmations
1832    /// after one hour, we gracefully shutdown fedimintd. This is necessary
1833    /// since we can no longer participate in consensus if our bitcoin backend
1834    /// is unable to observe the same chain tip as our peers.
1835    async fn wait_for_finality_confs_or_shutdown(&self, consensus_block_count: u32) {
1836        let backoff = if is_running_in_test_env() {
1837            // every 100ms for 60s
1838            backoff_util::custom_backoff(
1839                Duration::from_millis(100),
1840                Duration::from_millis(100),
1841                Some(10 * 60),
1842            )
1843        } else {
1844            // every max 10s for 1 hour
1845            backoff_util::fibonacci_max_one_hour()
1846        };
1847
1848        let wait_for_finality_confs = || async {
1849            let our_chain_tip_block_count = self.get_block_count()?;
1850            let consensus_chain_tip_block_count =
1851                consensus_block_count + self.cfg.consensus.finality_delay;
1852
1853            if consensus_chain_tip_block_count <= our_chain_tip_block_count {
1854                Ok(())
1855            } else {
1856                Err(anyhow::anyhow!("not enough confirmations"))
1857            }
1858        };
1859
1860        if retry("wait_for_finality_confs", backoff, wait_for_finality_confs)
1861            .await
1862            .is_err()
1863        {
1864            self.graceful_shutdown().await;
1865        }
1866    }
1867
1868    fn spawn_peer_supported_consensus_version_task(
1869        api_client: DynModuleApi,
1870        task_group: &TaskGroup,
1871        our_peer_id: PeerId,
1872    ) -> watch::Receiver<Option<ModuleConsensusVersion>> {
1873        let (sender, receiver) = watch::channel(None);
1874        task_group.spawn_cancellable("fetch-peer-consensus-versions", async move {
1875            loop {
1876                let request_futures = api_client.all_peers().iter().filter_map(|&peer| {
1877                    if peer == our_peer_id {
1878                        return None;
1879                    }
1880
1881                    let api_client_inner = api_client.clone();
1882                    Some(async move {
1883                        api_client_inner
1884                            .request_single_peer::<ModuleConsensusVersion>(
1885                                SUPPORTED_MODULE_CONSENSUS_VERSION_ENDPOINT.to_owned(),
1886                                ApiRequestErased::default(),
1887                                peer,
1888                            )
1889                            .await
1890                            .inspect(|res| debug!(
1891                                target: LOG_MODULE_WALLET,
1892                                %peer,
1893                                %our_peer_id,
1894                                ?res,
1895                                "Fetched supported module consensus version from peer"
1896                            ))
1897                            .inspect_err(|err| warn!(
1898                                target: LOG_MODULE_WALLET,
1899                                 %peer,
1900                                 err=%err.fmt_compact(),
1901                                "Failed to fetch consensus version from peer"
1902                            ))
1903                            .ok()
1904                    })
1905                });
1906
1907                let peer_consensus_versions = join_all(request_futures)
1908                    .await
1909                    .into_iter()
1910                    .flatten()
1911                    .collect::<Vec<_>>();
1912
1913                let sorted_consensus_versions = peer_consensus_versions
1914                    .into_iter()
1915                    .chain(std::iter::once(MODULE_CONSENSUS_VERSION))
1916                    .sorted()
1917                    .collect::<Vec<_>>();
1918                let all_peers_supported_version =
1919                    if sorted_consensus_versions.len() == api_client.all_peers().len() {
1920                        let min_supported_version = *sorted_consensus_versions
1921                            .first()
1922                            .expect("at least one element");
1923
1924                        debug!(
1925                            target: LOG_MODULE_WALLET,
1926                            ?sorted_consensus_versions,
1927                            "Fetched supported consensus versions from peers"
1928                        );
1929
1930                        Some(min_supported_version)
1931                    } else {
1932                        assert!(
1933                            sorted_consensus_versions.len() <= api_client.all_peers().len(),
1934                            "Too many peer responses",
1935                        );
1936                        trace!(
1937                            target: LOG_MODULE_WALLET,
1938                            ?sorted_consensus_versions,
1939                            "Not all peers have reported their consensus version yet"
1940                        );
1941                        None
1942                    };
1943
1944                #[allow(clippy::disallowed_methods)]
1945                if sender.send(all_peers_supported_version).is_err() {
1946                    warn!(target: LOG_MODULE_WALLET, "Failed to send consensus version to watch channel, stopping task");
1947                    break;
1948                }
1949
1950                if is_running_in_test_env() {
1951                    // Even in tests we don't want to spam the federation with requests about it
1952                    sleep(Duration::from_secs(5)).await;
1953                } else {
1954                    sleep(Duration::from_mins(10)).await;
1955                }
1956            }
1957        });
1958        receiver
1959    }
1960}
1961
1962#[instrument(target = LOG_MODULE_WALLET, level = "debug", skip_all)]
1963pub async fn run_broadcast_pending_tx(
1964    db: Database,
1965    rpc: ServerBitcoinRpcMonitor,
1966    broadcast: Arc<Notify>,
1967) {
1968    loop {
1969        // Unless something new happened, we broadcast once a minute
1970        let _ = tokio::time::timeout(Duration::from_mins(1), broadcast.notified()).await;
1971        broadcast_pending_tx(db.begin_transaction_nc().await, &rpc).await;
1972    }
1973}
1974
1975pub async fn broadcast_pending_tx(
1976    mut dbtx: DatabaseTransaction<'_>,
1977    rpc: &ServerBitcoinRpcMonitor,
1978) {
1979    let pending_tx: Vec<PendingTransaction> = dbtx
1980        .find_by_prefix(&PendingTransactionPrefixKey)
1981        .await
1982        .map(|(_, val)| val)
1983        .collect::<Vec<_>>()
1984        .await;
1985    let rbf_txids: BTreeSet<Txid> = pending_tx
1986        .iter()
1987        .filter_map(|tx| tx.rbf.clone().map(|rbf| rbf.txid))
1988        .collect();
1989    if !pending_tx.is_empty() {
1990        debug!(
1991            target: LOG_MODULE_WALLET,
1992            "Broadcasting pending transactions (total={}, rbf={})",
1993            pending_tx.len(),
1994            rbf_txids.len()
1995        );
1996    }
1997
1998    for PendingTransaction { tx, .. } in pending_tx {
1999        if !rbf_txids.contains(&tx.compute_txid()) {
2000            debug!(
2001                target: LOG_MODULE_WALLET,
2002                tx = %tx.compute_txid(),
2003                weight = tx.weight().to_wu(),
2004                output = ?tx.output,
2005                "Broadcasting peg-out",
2006            );
2007            trace!(transaction = ?tx);
2008            rpc.submit_transaction(tx).await;
2009        }
2010    }
2011}
2012
2013struct StatelessWallet<'a> {
2014    descriptor: &'a Descriptor<CompressedPublicKey>,
2015    secret_key: &'a secp256k1::SecretKey,
2016    secp: &'a secp256k1::Secp256k1<secp256k1::All>,
2017}
2018
2019impl StatelessWallet<'_> {
2020    /// Given a tx created from an `WalletOutput`, validate there will be no
2021    /// issues submitting the transaction to the Bitcoin network
2022    fn validate_tx(
2023        tx: &UnsignedTransaction,
2024        output: &WalletOutputV0,
2025        consensus_fee_rate: Feerate,
2026        network: Network,
2027    ) -> Result<(), WalletOutputError> {
2028        if let WalletOutputV0::PegOut(peg_out) = output
2029            && !peg_out.recipient.is_valid_for_network(network)
2030        {
2031            return Err(WalletOutputError::WrongNetwork(
2032                NetworkLegacyEncodingWrapper(network),
2033                NetworkLegacyEncodingWrapper(get_network_for_address(&peg_out.recipient)),
2034            ));
2035        }
2036
2037        // Validate the tx amount is over the dust limit
2038        if tx.peg_out_amount < tx.destination.minimal_non_dust() {
2039            return Err(WalletOutputError::PegOutUnderDustLimit);
2040        }
2041
2042        // Validate tx fee rate is above the consensus fee rate
2043        if tx.fees.fee_rate < consensus_fee_rate {
2044            return Err(WalletOutputError::PegOutFeeBelowConsensus(
2045                tx.fees.fee_rate,
2046                consensus_fee_rate,
2047            ));
2048        }
2049
2050        // Validate added fees are above the min relay tx fee
2051        // BIP-0125 requires 1 sat/vb for RBF by default (same as normal txs)
2052        let fees = match output {
2053            WalletOutputV0::PegOut(pegout) => pegout.fees,
2054            WalletOutputV0::Rbf(rbf) => rbf.fees,
2055        };
2056        if fees.fee_rate.sats_per_kvb < u64::from(DEFAULT_MIN_RELAY_TX_FEE) {
2057            return Err(WalletOutputError::BelowMinRelayFee);
2058        }
2059
2060        // Validate fees weight matches the actual weight
2061        if fees.total_weight != tx.fees.total_weight {
2062            return Err(WalletOutputError::TxWeightIncorrect(
2063                fees.total_weight,
2064                tx.fees.total_weight,
2065            ));
2066        }
2067
2068        Ok(())
2069    }
2070
2071    /// Attempts to create a tx ready to be signed from available UTXOs.
2072    //
2073    // * `peg_out_amount`: How much the peg-out should be
2074    // * `destination`: The address the user is pegging-out to
2075    // * `included_utxos`: UXTOs that must be included (for RBF)
2076    // * `remaining_utxos`: All other spendable UXTOs
2077    // * `fee_rate`: How much needs to be spent on fees
2078    // * `change_tweak`: How the federation can recognize it's change UTXO
2079    // * `rbf`: If this is an RBF transaction
2080    #[allow(clippy::too_many_arguments)]
2081    fn create_tx(
2082        &self,
2083        peg_out_amount: bitcoin::Amount,
2084        destination: ScriptBuf,
2085        mut included_utxos: Vec<(UTXOKey, SpendableUTXO)>,
2086        mut remaining_utxos: Vec<(UTXOKey, SpendableUTXO)>,
2087        mut fee_rate: Feerate,
2088        change_tweak: &[u8; 33],
2089        rbf: Option<Rbf>,
2090    ) -> Result<UnsignedTransaction, WalletOutputError> {
2091        // Add the rbf fees to the existing tx fees
2092        if let Some(rbf) = &rbf {
2093            fee_rate.sats_per_kvb += rbf.fees.fee_rate.sats_per_kvb;
2094        }
2095
2096        // When building a transaction we need to take care of two things:
2097        //  * We need enough input amount to fund all outputs
2098        //  * We need to keep an eye on the tx weight so we can factor the fees into out
2099        //    calculation
2100        // We then go on to calculate the base size of the transaction `total_weight`
2101        // and the maximum weight per added input which we will add every time
2102        // we select an input.
2103        let change_script = self.derive_script(change_tweak);
2104        let out_weight = (destination.len() * 4 + 1 + 32
2105            // Add change script weight, it's very likely to be needed if not we just overpay in fees
2106            + 1 // script len varint, 1 byte for all addresses we accept
2107            + change_script.len() * 4 // script len
2108            + 32) as u64; // value
2109        let mut total_weight = 16 + // version
2110            12 + // up to 2**16-1 inputs
2111            12 + // up to 2**16-1 outputs
2112            out_weight + // weight of all outputs
2113            16; // lock time
2114        // https://github.com/fedimint/fedimint/issues/4590
2115        #[allow(deprecated)]
2116        let max_input_weight = (self
2117            .descriptor
2118            .max_satisfaction_weight()
2119            .expect("is satisfyable") +
2120            128 + // TxOutHash
2121            16 + // TxOutIndex
2122            16) as u64; // sequence
2123
2124        // Ensure deterministic ordering of UTXOs for all peers
2125        included_utxos.sort_by_key(|(_, utxo)| utxo.amount);
2126        remaining_utxos.sort_by_key(|(_, utxo)| utxo.amount);
2127        included_utxos.extend(remaining_utxos);
2128
2129        // Finally we initialize our accumulator for selected input amounts
2130        let mut total_selected_value = bitcoin::Amount::from_sat(0);
2131        let mut selected_utxos: Vec<(UTXOKey, SpendableUTXO)> = vec![];
2132        let mut fees = fee_rate.calculate_fee(total_weight);
2133
2134        while total_selected_value < peg_out_amount + change_script.minimal_non_dust() + fees {
2135            match included_utxos.pop() {
2136                Some((utxo_key, utxo)) => {
2137                    total_selected_value += utxo.amount;
2138                    total_weight += max_input_weight;
2139                    fees = fee_rate.calculate_fee(total_weight);
2140                    selected_utxos.push((utxo_key, utxo));
2141                }
2142                _ => return Err(WalletOutputError::NotEnoughSpendableUTXO), // Not enough UTXOs
2143            }
2144        }
2145
2146        // We always pay ourselves change back to ensure that we don't lose anything due
2147        // to dust
2148        let change = total_selected_value - fees - peg_out_amount;
2149        let output: Vec<TxOut> = vec![
2150            TxOut {
2151                value: peg_out_amount,
2152                script_pubkey: destination.clone(),
2153            },
2154            TxOut {
2155                value: change,
2156                script_pubkey: change_script,
2157            },
2158        ];
2159        let mut change_out = bitcoin::psbt::Output::default();
2160        change_out
2161            .proprietary
2162            .insert(proprietary_tweak_key(), change_tweak.to_vec());
2163
2164        info!(
2165            target: LOG_MODULE_WALLET,
2166            inputs = selected_utxos.len(),
2167            input_sats = total_selected_value.to_sat(),
2168            peg_out_sats = peg_out_amount.to_sat(),
2169            ?total_weight,
2170            fees_sats = fees.to_sat(),
2171            fee_rate = fee_rate.sats_per_kvb,
2172            change_sats = change.to_sat(),
2173            "Creating peg-out tx",
2174        );
2175
2176        let transaction = Transaction {
2177            version: bitcoin::transaction::Version(2),
2178            lock_time: LockTime::ZERO,
2179            input: selected_utxos
2180                .iter()
2181                .map(|(utxo_key, _utxo)| TxIn {
2182                    previous_output: utxo_key.0,
2183                    script_sig: Default::default(),
2184                    sequence: Sequence::ENABLE_RBF_NO_LOCKTIME,
2185                    witness: bitcoin::Witness::new(),
2186                })
2187                .collect(),
2188            output,
2189        };
2190        info!(
2191            target: LOG_MODULE_WALLET,
2192            txid = %transaction.compute_txid(), "Creating peg-out tx"
2193        );
2194
2195        // FIXME: use custom data structure that guarantees more invariants and only
2196        // convert to PSBT for finalization
2197        let psbt = Psbt {
2198            unsigned_tx: transaction,
2199            version: 0,
2200            xpub: Default::default(),
2201            proprietary: Default::default(),
2202            unknown: Default::default(),
2203            inputs: selected_utxos
2204                .iter()
2205                .map(|(_utxo_key, utxo)| {
2206                    let script_pubkey = self
2207                        .descriptor
2208                        .tweak(&utxo.tweak, self.secp)
2209                        .script_pubkey();
2210                    Input {
2211                        non_witness_utxo: None,
2212                        witness_utxo: Some(TxOut {
2213                            value: utxo.amount,
2214                            script_pubkey,
2215                        }),
2216                        partial_sigs: Default::default(),
2217                        sighash_type: None,
2218                        redeem_script: None,
2219                        witness_script: Some(
2220                            self.descriptor
2221                                .tweak(&utxo.tweak, self.secp)
2222                                .script_code()
2223                                .expect("Failed to tweak descriptor"),
2224                        ),
2225                        bip32_derivation: Default::default(),
2226                        final_script_sig: None,
2227                        final_script_witness: None,
2228                        ripemd160_preimages: Default::default(),
2229                        sha256_preimages: Default::default(),
2230                        hash160_preimages: Default::default(),
2231                        hash256_preimages: Default::default(),
2232                        proprietary: vec![(proprietary_tweak_key(), utxo.tweak.to_vec())]
2233                            .into_iter()
2234                            .collect(),
2235                        tap_key_sig: Default::default(),
2236                        tap_script_sigs: Default::default(),
2237                        tap_scripts: Default::default(),
2238                        tap_key_origins: Default::default(),
2239                        tap_internal_key: Default::default(),
2240                        tap_merkle_root: Default::default(),
2241                        unknown: Default::default(),
2242                    }
2243                })
2244                .collect(),
2245            outputs: vec![Default::default(), change_out],
2246        };
2247
2248        Ok(UnsignedTransaction {
2249            psbt,
2250            signatures: vec![],
2251            change,
2252            fees: PegOutFees {
2253                fee_rate,
2254                total_weight,
2255            },
2256            destination,
2257            selected_utxos,
2258            peg_out_amount,
2259            rbf,
2260        })
2261    }
2262
2263    fn sign_psbt(&self, psbt: &mut Psbt) {
2264        let mut tx_hasher = SighashCache::new(&psbt.unsigned_tx);
2265
2266        for (idx, (psbt_input, _tx_input)) in psbt
2267            .inputs
2268            .iter_mut()
2269            .zip(psbt.unsigned_tx.input.iter())
2270            .enumerate()
2271        {
2272            let tweaked_secret = {
2273                let tweak = psbt_input
2274                    .proprietary
2275                    .get(&proprietary_tweak_key())
2276                    .expect("Malformed PSBT: expected tweak");
2277
2278                self.secret_key.tweak(tweak, self.secp)
2279            };
2280
2281            let tx_hash = tx_hasher
2282                .p2wsh_signature_hash(
2283                    idx,
2284                    psbt_input
2285                        .witness_script
2286                        .as_ref()
2287                        .expect("Missing witness script"),
2288                    psbt_input
2289                        .witness_utxo
2290                        .as_ref()
2291                        .expect("Missing UTXO")
2292                        .value,
2293                    EcdsaSighashType::All,
2294                )
2295                .expect("Failed to create segwit sighash");
2296
2297            let signature = self.secp.sign_ecdsa(
2298                &Message::from_digest_slice(&tx_hash[..]).unwrap(),
2299                &tweaked_secret,
2300            );
2301
2302            psbt_input.partial_sigs.insert(
2303                bitcoin::PublicKey {
2304                    compressed: true,
2305                    inner: secp256k1::PublicKey::from_secret_key(self.secp, &tweaked_secret),
2306                },
2307                EcdsaSig::sighash_all(signature),
2308            );
2309        }
2310    }
2311
2312    fn derive_script(&self, tweak: &[u8]) -> ScriptBuf {
2313        struct CompressedPublicKeyTranslator<'t, 's, Ctx: Verification> {
2314            tweak: &'t [u8],
2315            secp: &'s Secp256k1<Ctx>,
2316        }
2317
2318        impl<Ctx: Verification>
2319            miniscript::Translator<CompressedPublicKey, CompressedPublicKey, Infallible>
2320            for CompressedPublicKeyTranslator<'_, '_, Ctx>
2321        {
2322            fn pk(&mut self, pk: &CompressedPublicKey) -> Result<CompressedPublicKey, Infallible> {
2323                let hashed_tweak = {
2324                    let mut hasher = HmacEngine::<sha256::Hash>::new(&pk.key.serialize()[..]);
2325                    hasher.input(self.tweak);
2326                    Hmac::from_engine(hasher).to_byte_array()
2327                };
2328
2329                Ok(CompressedPublicKey {
2330                    key: pk
2331                        .key
2332                        .add_exp_tweak(
2333                            self.secp,
2334                            &Scalar::from_be_bytes(hashed_tweak).expect("can't fail"),
2335                        )
2336                        .expect("tweaking failed"),
2337                })
2338            }
2339            translate_hash_fail!(CompressedPublicKey, CompressedPublicKey, Infallible);
2340        }
2341
2342        let descriptor = self
2343            .descriptor
2344            .translate_pk(&mut CompressedPublicKeyTranslator {
2345                tweak,
2346                secp: self.secp,
2347            })
2348            .expect("can't fail");
2349
2350        descriptor.script_pubkey()
2351    }
2352}
2353
2354pub fn nonce_from_idx(nonce_idx: u64) -> [u8; 33] {
2355    let mut nonce: [u8; 33] = [0; 33];
2356    // Make it look like a compressed pubkey, has to be either 0x02 or 0x03
2357    nonce[0] = 0x02;
2358    nonce[1..].copy_from_slice(&nonce_idx.consensus_hash::<bitcoin::hashes::sha256::Hash>()[..]);
2359
2360    nonce
2361}
2362
2363/// A peg-out tx that is ready to be broadcast with a tweak for the change UTXO
2364#[derive(Clone, Debug, Encodable, Decodable)]
2365pub struct PendingTransaction {
2366    pub tx: bitcoin::Transaction,
2367    pub tweak: [u8; 33],
2368    pub change: bitcoin::Amount,
2369    pub destination: ScriptBuf,
2370    pub fees: PegOutFees,
2371    pub selected_utxos: Vec<(UTXOKey, SpendableUTXO)>,
2372    pub peg_out_amount: bitcoin::Amount,
2373    pub rbf: Option<Rbf>,
2374}
2375
2376impl Serialize for PendingTransaction {
2377    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2378    where
2379        S: serde::Serializer,
2380    {
2381        if serializer.is_human_readable() {
2382            serializer.serialize_str(&self.consensus_encode_to_hex())
2383        } else {
2384            serializer.serialize_bytes(&self.consensus_encode_to_vec())
2385        }
2386    }
2387}
2388
2389/// A PSBT that is awaiting enough signatures from the federation to becoming a
2390/// `PendingTransaction`
2391#[derive(Clone, Debug, Eq, PartialEq, Encodable, Decodable)]
2392pub struct UnsignedTransaction {
2393    pub psbt: Psbt,
2394    pub signatures: Vec<(PeerId, PegOutSignatureItem)>,
2395    pub change: bitcoin::Amount,
2396    pub fees: PegOutFees,
2397    pub destination: ScriptBuf,
2398    pub selected_utxos: Vec<(UTXOKey, SpendableUTXO)>,
2399    pub peg_out_amount: bitcoin::Amount,
2400    pub rbf: Option<Rbf>,
2401}
2402
2403impl Serialize for UnsignedTransaction {
2404    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2405    where
2406        S: serde::Serializer,
2407    {
2408        if serializer.is_human_readable() {
2409            serializer.serialize_str(&self.consensus_encode_to_hex())
2410        } else {
2411            serializer.serialize_bytes(&self.consensus_encode_to_vec())
2412        }
2413    }
2414}
2415
2416#[cfg(test)]
2417mod tests {
2418
2419    use std::str::FromStr;
2420
2421    use bitcoin::Network::{Bitcoin, Testnet};
2422    use bitcoin::hashes::Hash;
2423    use bitcoin::{Address, Amount, OutPoint, Txid, secp256k1};
2424    use fedimint_core::Feerate;
2425    use fedimint_core::encoding::btc::NetworkLegacyEncodingWrapper;
2426    use fedimint_core::envs::is_automatic_consensus_version_voting_disabled;
2427    use fedimint_wallet_common::{PegOut, PegOutFees, Rbf, WalletOutputV0};
2428    use miniscript::descriptor::Wsh;
2429
2430    use crate::common::PegInDescriptor;
2431    use crate::{
2432        CompressedPublicKey, OsRng, SpendableUTXO, StatelessWallet, UTXOKey, WalletOutputError,
2433    };
2434
2435    #[test]
2436    fn create_tx_should_validate_amounts() {
2437        let secp = secp256k1::Secp256k1::new();
2438
2439        let descriptor = PegInDescriptor::Wsh(
2440            Wsh::new_sortedmulti(
2441                3,
2442                (0..4)
2443                    .map(|_| secp.generate_keypair(&mut OsRng))
2444                    .map(|(_, key)| CompressedPublicKey { key })
2445                    .collect(),
2446            )
2447            .unwrap(),
2448        );
2449
2450        let (secret_key, _) = secp.generate_keypair(&mut OsRng);
2451
2452        let wallet = StatelessWallet {
2453            descriptor: &descriptor,
2454            secret_key: &secret_key,
2455            secp: &secp,
2456        };
2457
2458        let spendable = SpendableUTXO {
2459            tweak: [0; 33],
2460            amount: bitcoin::Amount::from_sat(3000),
2461        };
2462
2463        let recipient = Address::from_str("32iVBEu4dxkUQk9dJbZUiBiQdmypcEyJRf").unwrap();
2464
2465        let fee = Feerate { sats_per_kvb: 1000 };
2466        let weight = 875;
2467
2468        // not enough SpendableUTXO
2469        // tx fee = ceil(875 / 4) * 1 sat/vb = 219
2470        // change script dust = 330
2471        // spendable sats = 3000 - 219 - 330 = 2451
2472        let tx = wallet.create_tx(
2473            Amount::from_sat(2452),
2474            recipient.clone().assume_checked().script_pubkey(),
2475            vec![],
2476            vec![(UTXOKey(OutPoint::null()), spendable.clone())],
2477            fee,
2478            &[0; 33],
2479            None,
2480        );
2481        assert_eq!(tx, Err(WalletOutputError::NotEnoughSpendableUTXO));
2482
2483        // successful tx creation
2484        let mut tx = wallet
2485            .create_tx(
2486                Amount::from_sat(1000),
2487                recipient.clone().assume_checked().script_pubkey(),
2488                vec![],
2489                vec![(UTXOKey(OutPoint::null()), spendable)],
2490                fee,
2491                &[0; 33],
2492                None,
2493            )
2494            .expect("is ok");
2495
2496        // peg out weight is incorrectly set to 0
2497        let res = StatelessWallet::validate_tx(&tx, &rbf(fee.sats_per_kvb, 0), fee, Bitcoin);
2498        assert_eq!(res, Err(WalletOutputError::TxWeightIncorrect(0, weight)));
2499
2500        // fee rate set below min relay fee to 0
2501        let res = StatelessWallet::validate_tx(&tx, &rbf(0, weight), fee, Bitcoin);
2502        assert_eq!(res, Err(WalletOutputError::BelowMinRelayFee));
2503
2504        // fees are okay
2505        let res = StatelessWallet::validate_tx(&tx, &rbf(fee.sats_per_kvb, weight), fee, Bitcoin);
2506        assert_eq!(res, Ok(()));
2507
2508        // tx has fee below consensus
2509        tx.fees = PegOutFees::new(0, weight);
2510        let res = StatelessWallet::validate_tx(&tx, &rbf(fee.sats_per_kvb, weight), fee, Bitcoin);
2511        assert_eq!(
2512            res,
2513            Err(WalletOutputError::PegOutFeeBelowConsensus(
2514                Feerate { sats_per_kvb: 0 },
2515                fee
2516            ))
2517        );
2518
2519        // tx has peg-out amount under dust limit
2520        tx.peg_out_amount = bitcoin::Amount::ZERO;
2521        let res = StatelessWallet::validate_tx(&tx, &rbf(fee.sats_per_kvb, weight), fee, Bitcoin);
2522        assert_eq!(res, Err(WalletOutputError::PegOutUnderDustLimit));
2523
2524        // tx is invalid for network
2525        let output = WalletOutputV0::PegOut(PegOut {
2526            recipient,
2527            amount: bitcoin::Amount::from_sat(1000),
2528            fees: PegOutFees::new(100, weight),
2529        });
2530        let res = StatelessWallet::validate_tx(&tx, &output, fee, Testnet);
2531        assert_eq!(
2532            res,
2533            Err(WalletOutputError::WrongNetwork(
2534                NetworkLegacyEncodingWrapper(Testnet),
2535                NetworkLegacyEncodingWrapper(Bitcoin)
2536            ))
2537        );
2538    }
2539
2540    fn rbf(sats_per_kvb: u64, total_weight: u64) -> WalletOutputV0 {
2541        WalletOutputV0::Rbf(Rbf {
2542            fees: PegOutFees::new(sats_per_kvb, total_weight),
2543            txid: Txid::all_zeros(),
2544        })
2545    }
2546
2547    #[test]
2548    fn automatic_vote_suppressed_when_env_set() {
2549        unsafe {
2550            std::env::set_var("FM_WALLET_DISABLE_AUTOMATIC_CONSENSUS_VERSION_VOTING", "1");
2551        }
2552        assert!(is_automatic_consensus_version_voting_disabled());
2553        unsafe {
2554            std::env::remove_var("FM_WALLET_DISABLE_AUTOMATIC_CONSENSUS_VERSION_VOTING");
2555        }
2556    }
2557
2558    #[test]
2559    fn automatic_vote_active_when_env_unset() {
2560        unsafe {
2561            std::env::remove_var("FM_WALLET_DISABLE_AUTOMATIC_CONSENSUS_VERSION_VOTING");
2562        }
2563        assert!(!is_automatic_consensus_version_voting_disabled());
2564    }
2565}