Skip to main content

fedimint_wallet_server/
lib.rs

1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::cast_possible_wrap)]
4#![allow(clippy::default_trait_access)]
5#![allow(clippy::missing_errors_doc)]
6#![allow(clippy::missing_panics_doc)]
7#![allow(clippy::module_name_repetitions)]
8#![allow(clippy::must_use_candidate)]
9#![allow(clippy::needless_lifetimes)]
10#![allow(clippy::too_many_lines)]
11
12pub mod db;
13pub mod envs;
14
15use std::clone::Clone;
16use std::cmp::min;
17use std::collections::{BTreeMap, BTreeSet, HashMap};
18use std::convert::Infallible;
19use std::sync::Arc;
20#[cfg(not(target_family = "wasm"))]
21use std::time::Duration;
22
23use anyhow::{Context, bail, ensure, format_err};
24use bitcoin::absolute::LockTime;
25use bitcoin::address::NetworkUnchecked;
26use bitcoin::ecdsa::Signature as EcdsaSig;
27use bitcoin::hashes::{Hash as BitcoinHash, HashEngine, Hmac, HmacEngine, sha256};
28use bitcoin::policy::DEFAULT_MIN_RELAY_TX_FEE;
29use bitcoin::psbt::{Input, Psbt};
30use bitcoin::secp256k1::{self, All, Message, Scalar, Secp256k1, Verification};
31use bitcoin::sighash::{EcdsaSighashType, SighashCache};
32use bitcoin::{Address, BlockHash, Network, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Txid};
33use common::config::WalletConfigConsensus;
34use common::{
35    DEPRECATED_RBF_ERROR, PegOutFees, PegOutSignatureItem, ProcessPegOutSigError, SpendableUTXO,
36    TxOutputSummary, WalletCommonInit, WalletConsensusItem, WalletInput, WalletModuleTypes,
37    WalletOutput, WalletOutputOutcome, WalletSummary, proprietary_tweak_key,
38};
39use db::{
40    BlockHashByHeightKey, BlockHashByHeightKeyPrefix, BlockHashByHeightValue, RecoveryItemKey,
41    RecoveryItemKeyPrefix,
42};
43use envs::get_feerate_multiplier;
44use fedimint_api_client::api::{DynModuleApi, FederationApiExt};
45use fedimint_core::config::{
46    ServerModuleConfig, ServerModuleConsensusConfig, TypedServerModuleConfig,
47    TypedServerModuleConsensusConfig,
48};
49use fedimint_core::core::ModuleInstanceId;
50use fedimint_core::db::{
51    Database, DatabaseTransaction, DatabaseVersion, IDatabaseTransactionOpsCoreTyped,
52};
53use fedimint_core::encoding::btc::NetworkLegacyEncodingWrapper;
54use fedimint_core::encoding::{Decodable, Encodable};
55use fedimint_core::envs::{
56    BitcoinRpcConfig, FM_ENABLE_MODULE_WALLET_ENV, is_env_var_set_opt, is_rbf_withdrawal_enabled,
57    is_running_in_test_env,
58};
59use fedimint_core::module::audit::Audit;
60use fedimint_core::module::{
61    Amounts, ApiEndpoint, ApiError, ApiRequestErased, ApiVersion, CORE_CONSENSUS_VERSION,
62    CoreConsensusVersion, InputMeta, ModuleConsensusVersion, ModuleInit,
63    SupportedModuleApiVersions, TransactionItemAmounts, api_endpoint,
64};
65use fedimint_core::net::auth::check_auth;
66use fedimint_core::task::TaskGroup;
67#[cfg(not(target_family = "wasm"))]
68use fedimint_core::task::sleep;
69use fedimint_core::util::{FmtCompact, FmtCompactAnyhow as _, backoff_util, retry};
70use fedimint_core::{
71    Feerate, InPoint, NumPeersExt, OutPoint, PeerId, apply, async_trait_maybe_send,
72    get_network_for_address, push_db_key_items, push_db_pair_items,
73};
74use fedimint_logging::LOG_MODULE_WALLET;
75use fedimint_server_core::bitcoin_rpc::ServerBitcoinRpcMonitor;
76use fedimint_server_core::config::{PeerHandleOps, PeerHandleOpsExt};
77use fedimint_server_core::migration::ServerModuleDbMigrationFn;
78use fedimint_server_core::{
79    ConfigGenModuleArgs, ServerModule, ServerModuleInit, ServerModuleInitArgs,
80};
81pub use fedimint_wallet_common as common;
82use fedimint_wallet_common::config::{FeeConsensus, WalletClientConfig, WalletConfig};
83use fedimint_wallet_common::endpoint_constants::{
84    ACTIVATE_CONSENSUS_VERSION_VOTING_ENDPOINT, BITCOIN_KIND_ENDPOINT, BITCOIN_RPC_CONFIG_ENDPOINT,
85    BLOCK_COUNT_ENDPOINT, BLOCK_COUNT_LOCAL_ENDPOINT, MODULE_CONSENSUS_VERSION_ENDPOINT,
86    PEG_OUT_FEES_ENDPOINT, RECOVERY_COUNT_ENDPOINT, RECOVERY_SLICE_ENDPOINT,
87    SUPPORTED_MODULE_CONSENSUS_VERSION_ENDPOINT, UTXO_CONFIRMED_ENDPOINT, WALLET_SUMMARY_ENDPOINT,
88};
89use fedimint_wallet_common::envs::FM_PORT_ESPLORA_ENV;
90use fedimint_wallet_common::keys::CompressedPublicKey;
91use fedimint_wallet_common::tweakable::Tweakable;
92use fedimint_wallet_common::{
93    MODULE_CONSENSUS_VERSION, Rbf, RecoveryItem, UnknownWalletInputVariantError, WalletInputError,
94    WalletOutputError, WalletOutputV0,
95};
96use futures::future::join_all;
97use futures::{FutureExt, StreamExt};
98use itertools::Itertools;
99use metrics::{
100    WALLET_INOUT_FEES_SATS, WALLET_INOUT_SATS, WALLET_PEGIN_FEES_SATS, WALLET_PEGIN_SATS,
101    WALLET_PEGOUT_FEES_SATS, WALLET_PEGOUT_SATS,
102};
103use miniscript::psbt::PsbtExt;
104use miniscript::{Descriptor, TranslatePk, translate_hash_fail};
105use rand::rngs::OsRng;
106use serde::Serialize;
107use strum::IntoEnumIterator;
108use tokio::sync::{Notify, watch};
109use tracing::{debug, info, instrument, trace, warn};
110
111use crate::db::{
112    BlockCountVoteKey, BlockCountVotePrefix, BlockHashKey, BlockHashKeyPrefix,
113    ClaimedPegInOutpointKey, ClaimedPegInOutpointPrefixKey, ConsensusVersionVoteKey,
114    ConsensusVersionVotePrefix, ConsensusVersionVotingActivationKey,
115    ConsensusVersionVotingActivationPrefix, DbKeyPrefix, FeeRateVoteKey, FeeRateVotePrefix,
116    PegOutBitcoinTransaction, PegOutBitcoinTransactionPrefix, PegOutNonceKey, PegOutTxSignatureCI,
117    PegOutTxSignatureCIPrefix, PendingTransactionKey, PendingTransactionPrefixKey, UTXOKey,
118    UTXOPrefixKey, UnsignedTransactionKey, UnsignedTransactionPrefixKey, UnspentTxOutKey,
119    UnspentTxOutPrefix, migrate_to_v1, migrate_to_v2,
120};
121use crate::metrics::WALLET_BLOCK_COUNT;
122
123mod metrics;
124
125#[derive(Debug, Clone)]
126pub struct WalletInit;
127
128impl ModuleInit for WalletInit {
129    type Common = WalletCommonInit;
130
131    async fn dump_database(
132        &self,
133        dbtx: &mut DatabaseTransaction<'_>,
134        prefix_names: Vec<String>,
135    ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
136        let mut wallet: BTreeMap<String, Box<dyn erased_serde::Serialize + Send>> = BTreeMap::new();
137        let filtered_prefixes = DbKeyPrefix::iter().filter(|f| {
138            prefix_names.is_empty() || prefix_names.contains(&f.to_string().to_lowercase())
139        });
140        for table in filtered_prefixes {
141            match table {
142                DbKeyPrefix::BlockHash => {
143                    push_db_key_items!(dbtx, BlockHashKeyPrefix, BlockHashKey, wallet, "Blocks");
144                }
145                DbKeyPrefix::BlockHashByHeight => {
146                    push_db_key_items!(
147                        dbtx,
148                        BlockHashByHeightKeyPrefix,
149                        BlockHashByHeightKey,
150                        wallet,
151                        "Blocks by height"
152                    );
153                }
154                DbKeyPrefix::PegOutBitcoinOutPoint => {
155                    push_db_pair_items!(
156                        dbtx,
157                        PegOutBitcoinTransactionPrefix,
158                        PegOutBitcoinTransaction,
159                        WalletOutputOutcome,
160                        wallet,
161                        "Peg Out Bitcoin Transaction"
162                    );
163                }
164                DbKeyPrefix::PegOutTxSigCi => {
165                    push_db_pair_items!(
166                        dbtx,
167                        PegOutTxSignatureCIPrefix,
168                        PegOutTxSignatureCI,
169                        Vec<secp256k1::ecdsa::Signature>,
170                        wallet,
171                        "Peg Out Transaction Signatures"
172                    );
173                }
174                DbKeyPrefix::PendingTransaction => {
175                    push_db_pair_items!(
176                        dbtx,
177                        PendingTransactionPrefixKey,
178                        PendingTransactionKey,
179                        PendingTransaction,
180                        wallet,
181                        "Pending Transactions"
182                    );
183                }
184                DbKeyPrefix::PegOutNonce => {
185                    if let Some(nonce) = dbtx.get_value(&PegOutNonceKey).await {
186                        wallet.insert("Peg Out Nonce".to_string(), Box::new(nonce));
187                    }
188                }
189                DbKeyPrefix::UnsignedTransaction => {
190                    push_db_pair_items!(
191                        dbtx,
192                        UnsignedTransactionPrefixKey,
193                        UnsignedTransactionKey,
194                        UnsignedTransaction,
195                        wallet,
196                        "Unsigned Transactions"
197                    );
198                }
199                DbKeyPrefix::Utxo => {
200                    push_db_pair_items!(
201                        dbtx,
202                        UTXOPrefixKey,
203                        UTXOKey,
204                        SpendableUTXO,
205                        wallet,
206                        "UTXOs"
207                    );
208                }
209                DbKeyPrefix::BlockCountVote => {
210                    push_db_pair_items!(
211                        dbtx,
212                        BlockCountVotePrefix,
213                        BlockCountVoteKey,
214                        u32,
215                        wallet,
216                        "Block Count Votes"
217                    );
218                }
219                DbKeyPrefix::FeeRateVote => {
220                    push_db_pair_items!(
221                        dbtx,
222                        FeeRateVotePrefix,
223                        FeeRateVoteKey,
224                        Feerate,
225                        wallet,
226                        "Fee Rate Votes"
227                    );
228                }
229                DbKeyPrefix::ClaimedPegInOutpoint => {
230                    push_db_pair_items!(
231                        dbtx,
232                        ClaimedPegInOutpointPrefixKey,
233                        PeggedInOutpointKey,
234                        (),
235                        wallet,
236                        "Claimed Peg-in Outpoint"
237                    );
238                }
239                DbKeyPrefix::ConsensusVersionVote => {
240                    push_db_pair_items!(
241                        dbtx,
242                        ConsensusVersionVotePrefix,
243                        ConsensusVersionVoteKey,
244                        ModuleConsensusVersion,
245                        wallet,
246                        "Consensus Version Votes"
247                    );
248                }
249                DbKeyPrefix::UnspentTxOut => {
250                    push_db_pair_items!(
251                        dbtx,
252                        UnspentTxOutPrefix,
253                        UnspentTxOutKey,
254                        TxOut,
255                        wallet,
256                        "Consensus Version Votes"
257                    );
258                }
259                DbKeyPrefix::ConsensusVersionVotingActivation => {
260                    push_db_pair_items!(
261                        dbtx,
262                        ConsensusVersionVotingActivationPrefix,
263                        ConsensusVersionVotingActivationKey,
264                        (),
265                        wallet,
266                        "Consensus Version Voting Activation Key"
267                    );
268                }
269                DbKeyPrefix::RecoveryItem => {
270                    push_db_pair_items!(
271                        dbtx,
272                        RecoveryItemKeyPrefix,
273                        RecoveryItemKey,
274                        RecoveryItem,
275                        wallet,
276                        "Recovery Items"
277                    );
278                }
279            }
280        }
281
282        Box::new(wallet.into_iter())
283    }
284}
285
286/// Default finality delay based on network
287fn default_finality_delay(network: Network) -> u32 {
288    match network {
289        Network::Bitcoin | Network::Regtest => 10,
290        Network::Testnet | Network::Signet | Network::Testnet4 => 2,
291    }
292}
293
294/// Default Bitcoin RPC config for clients
295fn default_client_bitcoin_rpc(network: Network) -> BitcoinRpcConfig {
296    let url = match network {
297        Network::Bitcoin => "https://mempool.space/api/".to_string(),
298        Network::Testnet => "https://mempool.space/testnet/api/".to_string(),
299        Network::Testnet4 => "https://mempool.space/testnet4/api/".to_string(),
300        Network::Signet => "https://mutinynet.com/api/".to_string(),
301        Network::Regtest => format!(
302            "http://127.0.0.1:{}/",
303            std::env::var(FM_PORT_ESPLORA_ENV).unwrap_or_else(|_| String::from("50002"))
304        ),
305    };
306
307    BitcoinRpcConfig {
308        kind: "esplora".to_string(),
309        url: fedimint_core::util::SafeUrl::parse(&url).expect("hardcoded URL is valid"),
310    }
311}
312
313#[apply(async_trait_maybe_send!)]
314impl ServerModuleInit for WalletInit {
315    type Module = Wallet;
316
317    fn versions(&self, _core: CoreConsensusVersion) -> &[ModuleConsensusVersion] {
318        &[MODULE_CONSENSUS_VERSION]
319    }
320
321    fn supported_api_versions(&self) -> SupportedModuleApiVersions {
322        SupportedModuleApiVersions::from_raw(
323            (CORE_CONSENSUS_VERSION.major, CORE_CONSENSUS_VERSION.minor),
324            (
325                MODULE_CONSENSUS_VERSION.major,
326                MODULE_CONSENSUS_VERSION.minor,
327            ),
328            &[(0, 2)],
329        )
330    }
331
332    fn is_enabled_by_default(&self) -> bool {
333        is_env_var_set_opt(FM_ENABLE_MODULE_WALLET_ENV).unwrap_or(true)
334    }
335
336    async fn init(&self, args: &ServerModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
337        for direction in ["incoming", "outgoing"] {
338            WALLET_INOUT_FEES_SATS
339                .with_label_values(&[direction])
340                .get_sample_count();
341            WALLET_INOUT_SATS
342                .with_label_values(&[direction])
343                .get_sample_count();
344        }
345        // Eagerly initialize metrics that trigger infrequently
346        WALLET_PEGIN_FEES_SATS.get_sample_count();
347        WALLET_PEGIN_SATS.get_sample_count();
348        WALLET_PEGOUT_SATS.get_sample_count();
349        WALLET_PEGOUT_FEES_SATS.get_sample_count();
350
351        Ok(Wallet::new(
352            args.cfg().to_typed()?,
353            args.db(),
354            args.task_group(),
355            args.our_peer_id(),
356            args.module_api().clone(),
357            args.server_bitcoin_rpc_monitor(),
358        )
359        .await?)
360    }
361
362    fn trusted_dealer_gen(
363        &self,
364        peers: &[PeerId],
365        args: &ConfigGenModuleArgs,
366    ) -> BTreeMap<PeerId, ServerModuleConfig> {
367        let secp = bitcoin::secp256k1::Secp256k1::new();
368        let finality_delay = default_finality_delay(args.network);
369        let client_default_bitcoin_rpc = default_client_bitcoin_rpc(args.network);
370
371        let btc_pegin_keys = peers
372            .iter()
373            .map(|&id| (id, secp.generate_keypair(&mut OsRng)))
374            .collect::<Vec<_>>();
375
376        let wallet_cfg: BTreeMap<PeerId, WalletConfig> = btc_pegin_keys
377            .iter()
378            .map(|(id, (sk, _))| {
379                let cfg = WalletConfig::new(
380                    btc_pegin_keys
381                        .iter()
382                        .map(|(peer_id, (_, pk))| (*peer_id, CompressedPublicKey { key: *pk }))
383                        .collect(),
384                    *sk,
385                    peers.to_num_peers().threshold(),
386                    args.network,
387                    finality_delay,
388                    client_default_bitcoin_rpc.clone(),
389                    FeeConsensus::default(),
390                );
391                (*id, cfg)
392            })
393            .collect();
394
395        wallet_cfg
396            .into_iter()
397            .map(|(k, v)| (k, v.to_erased()))
398            .collect()
399    }
400
401    async fn distributed_gen(
402        &self,
403        peers: &(dyn PeerHandleOps + Send + Sync),
404        args: &ConfigGenModuleArgs,
405    ) -> anyhow::Result<ServerModuleConfig> {
406        let secp = secp256k1::Secp256k1::new();
407        let (sk, pk) = secp.generate_keypair(&mut OsRng);
408        let our_key = CompressedPublicKey { key: pk };
409        let peer_peg_in_keys: BTreeMap<PeerId, CompressedPublicKey> = peers
410            .exchange_encodable(our_key.key)
411            .await?
412            .into_iter()
413            .map(|(k, key)| (k, CompressedPublicKey { key }))
414            .collect();
415
416        let finality_delay = default_finality_delay(args.network);
417        let client_default_bitcoin_rpc = default_client_bitcoin_rpc(args.network);
418
419        let wallet_cfg = WalletConfig::new(
420            peer_peg_in_keys,
421            sk,
422            peers.num_peers().threshold(),
423            args.network,
424            finality_delay,
425            client_default_bitcoin_rpc,
426            FeeConsensus::default(),
427        );
428
429        Ok(wallet_cfg.to_erased())
430    }
431
432    fn validate_config(&self, identity: &PeerId, config: ServerModuleConfig) -> anyhow::Result<()> {
433        let config = config.to_typed::<WalletConfig>()?;
434        let pubkey = secp256k1::PublicKey::from_secret_key_global(&config.private.peg_in_key);
435
436        if config
437            .consensus
438            .peer_peg_in_keys
439            .get(identity)
440            .ok_or_else(|| format_err!("Secret key doesn't match any public key"))?
441            != &CompressedPublicKey::new(pubkey)
442        {
443            bail!(" Bitcoin wallet private key doesn't match multisig pubkey");
444        }
445
446        Ok(())
447    }
448
449    fn get_client_config(
450        &self,
451        config: &ServerModuleConsensusConfig,
452    ) -> anyhow::Result<WalletClientConfig> {
453        let config = WalletConfigConsensus::from_erased(config)?;
454        Ok(WalletClientConfig {
455            peg_in_descriptor: config.peg_in_descriptor,
456            network: config.network,
457            fee_consensus: config.fee_consensus,
458            finality_delay: config.finality_delay,
459            default_bitcoin_rpc: config.client_default_bitcoin_rpc,
460        })
461    }
462
463    /// DB migrations to move from old to newer versions
464    fn get_database_migrations(
465        &self,
466    ) -> BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Wallet>> {
467        let mut migrations: BTreeMap<DatabaseVersion, ServerModuleDbMigrationFn<Wallet>> =
468            BTreeMap::new();
469        migrations.insert(
470            DatabaseVersion(0),
471            Box::new(|ctx| migrate_to_v1(ctx).boxed()),
472        );
473        migrations.insert(
474            DatabaseVersion(1),
475            Box::new(|ctx| migrate_to_v2(ctx).boxed()),
476        );
477        migrations
478    }
479
480    fn used_db_prefixes(&self) -> Option<BTreeSet<u8>> {
481        Some(DbKeyPrefix::iter().map(|p| p as u8).collect())
482    }
483}
484
485#[apply(async_trait_maybe_send!)]
486impl ServerModule for Wallet {
487    type Common = WalletModuleTypes;
488    type Init = WalletInit;
489
490    async fn consensus_proposal<'a>(
491        &'a self,
492        dbtx: &mut DatabaseTransaction<'_>,
493    ) -> Vec<WalletConsensusItem> {
494        let mut items = dbtx
495            .find_by_prefix(&PegOutTxSignatureCIPrefix)
496            .await
497            .map(|(key, val)| {
498                WalletConsensusItem::PegOutSignature(PegOutSignatureItem {
499                    txid: key.0,
500                    signature: val,
501                })
502            })
503            .collect::<Vec<WalletConsensusItem>>()
504            .await;
505
506        // If we are unable to get a block count from the node we skip adding a block
507        // count vote to consensus items.
508        //
509        // The potential impact of not including the latest block count from our peer's
510        // node is delayed processing of change outputs for the federation, which is an
511        // acceptable risk since subsequent rounds of consensus will reattempt to fetch
512        // the latest block count.
513        match self.get_block_count() {
514            Ok(block_count) => {
515                let mut block_count_vote =
516                    block_count.saturating_sub(self.cfg.consensus.finality_delay);
517
518                let current_consensus_block_count = self.consensus_block_count(dbtx).await;
519
520                // This will prevent that more then five blocks are synced in a single database
521                // transaction if the federation was offline for a prolonged period of time.
522                if current_consensus_block_count != 0 {
523                    block_count_vote = min(
524                        block_count_vote,
525                        current_consensus_block_count
526                            + if is_running_in_test_env() {
527                                // We have some tests that mine *a lot* of blocks (empty)
528                                // and need them processed fast, so we raise the max.
529                                100
530                            } else {
531                                5
532                            },
533                    );
534                }
535
536                let current_vote = dbtx
537                    .get_value(&BlockCountVoteKey(self.our_peer_id))
538                    .await
539                    .unwrap_or(0);
540
541                trace!(
542                    target: LOG_MODULE_WALLET,
543                    ?current_vote,
544                    ?block_count_vote,
545                    ?block_count,
546                    ?current_consensus_block_count,
547                    "Proposing block count"
548                );
549
550                WALLET_BLOCK_COUNT.set(i64::from(block_count_vote));
551                items.push(WalletConsensusItem::BlockCount(block_count_vote));
552            }
553            Err(err) => {
554                warn!(target: LOG_MODULE_WALLET, err = %err.fmt_compact_anyhow(), "Can't update block count");
555            }
556        }
557
558        let fee_rate_proposal = self.get_fee_rate_opt();
559
560        items.push(WalletConsensusItem::Feerate(fee_rate_proposal));
561
562        // Consensus upgrade activation voting
563        let manual_vote = dbtx
564            .get_value(&ConsensusVersionVotingActivationKey)
565            .await
566            .map(|()| {
567                // TODO: allow voting on any version between the currently active and max
568                // supported one in case we support a too high one already
569                MODULE_CONSENSUS_VERSION
570            });
571
572        let active_consensus_version = self.consensus_module_consensus_version(dbtx).await;
573        let automatic_vote = self.peer_supported_consensus_version.borrow().and_then(
574            |supported_consensus_version| {
575                // Only automatically vote if the commonly supported version is higher than the
576                // currently active one
577                (active_consensus_version < supported_consensus_version)
578                    .then_some(supported_consensus_version)
579            },
580        );
581
582        // Prioritizing automatic vote for now since the manual vote never resets. Once
583        // that is fixed this should be switched around.
584        if let Some(vote_version) = automatic_vote.or(manual_vote) {
585            items.push(WalletConsensusItem::ModuleConsensusVersion(vote_version));
586        }
587
588        items
589    }
590
591    async fn process_consensus_item<'a, 'b>(
592        &'a self,
593        dbtx: &mut DatabaseTransaction<'b>,
594        consensus_item: WalletConsensusItem,
595        peer: PeerId,
596    ) -> anyhow::Result<()> {
597        trace!(target: LOG_MODULE_WALLET, ?consensus_item, "Processing consensus item proposal");
598
599        match consensus_item {
600            WalletConsensusItem::BlockCount(block_count_vote) => {
601                let current_vote = dbtx.get_value(&BlockCountVoteKey(peer)).await.unwrap_or(0);
602
603                if block_count_vote < current_vote {
604                    warn!(target: LOG_MODULE_WALLET, ?peer, ?block_count_vote, "Block count vote is outdated");
605                }
606
607                ensure!(
608                    block_count_vote > current_vote,
609                    "Block count vote is redundant"
610                );
611
612                let old_consensus_block_count = self.consensus_block_count(dbtx).await;
613
614                dbtx.insert_entry(&BlockCountVoteKey(peer), &block_count_vote)
615                    .await;
616
617                let new_consensus_block_count = self.consensus_block_count(dbtx).await;
618
619                debug!(
620                    target: LOG_MODULE_WALLET,
621                    ?peer,
622                    ?current_vote,
623                    ?block_count_vote,
624                    ?old_consensus_block_count,
625                    ?new_consensus_block_count,
626                    "Received block count vote"
627                );
628
629                assert!(old_consensus_block_count <= new_consensus_block_count);
630
631                if new_consensus_block_count != old_consensus_block_count {
632                    // We do not sync blocks that predate the federation itself
633                    if old_consensus_block_count != 0 {
634                        self.sync_up_to_consensus_count(
635                            dbtx,
636                            old_consensus_block_count,
637                            new_consensus_block_count,
638                        )
639                        .await;
640                    } else {
641                        info!(
642                            target: LOG_MODULE_WALLET,
643                            ?old_consensus_block_count,
644                            ?new_consensus_block_count,
645                            "Not syncing up to consensus block count because we are at block 0"
646                        );
647                    }
648                }
649            }
650            WalletConsensusItem::Feerate(feerate) => {
651                if Some(feerate) == dbtx.insert_entry(&FeeRateVoteKey(peer), &feerate).await {
652                    bail!("Fee rate vote is redundant");
653                }
654            }
655            WalletConsensusItem::PegOutSignature(peg_out_signature) => {
656                let txid = peg_out_signature.txid;
657
658                if dbtx.get_value(&PendingTransactionKey(txid)).await.is_some() {
659                    bail!("Already received a threshold of valid signatures");
660                }
661
662                let mut unsigned = dbtx
663                    .get_value(&UnsignedTransactionKey(txid))
664                    .await
665                    .context("Unsigned transaction does not exist")?;
666
667                self.sign_peg_out_psbt(&mut unsigned.psbt, peer, &peg_out_signature)
668                    .context("Peg out signature is invalid")?;
669
670                dbtx.insert_entry(&UnsignedTransactionKey(txid), &unsigned)
671                    .await;
672
673                if let Ok(pending_tx) = self.finalize_peg_out_psbt(unsigned) {
674                    // We were able to finalize the transaction, so we will delete the
675                    // PSBT and instead keep the extracted tx for periodic transmission
676                    // as well as to accept the change into our wallet eventually once
677                    // it confirms.
678                    dbtx.insert_new_entry(&PendingTransactionKey(txid), &pending_tx)
679                        .await;
680
681                    dbtx.remove_entry(&PegOutTxSignatureCI(txid)).await;
682                    dbtx.remove_entry(&UnsignedTransactionKey(txid)).await;
683                    let broadcast_pending = self.broadcast_pending.clone();
684                    dbtx.on_commit(move || {
685                        broadcast_pending.notify_one();
686                    });
687                }
688            }
689            WalletConsensusItem::ModuleConsensusVersion(module_consensus_version) => {
690                let current_vote = dbtx
691                    .get_value(&ConsensusVersionVoteKey(peer))
692                    .await
693                    .unwrap_or(ModuleConsensusVersion::new(2, 0));
694
695                ensure!(
696                    module_consensus_version > current_vote,
697                    "Module consensus version vote is redundant"
698                );
699
700                dbtx.insert_entry(&ConsensusVersionVoteKey(peer), &module_consensus_version)
701                    .await;
702
703                assert!(
704                    self.consensus_module_consensus_version(dbtx).await <= MODULE_CONSENSUS_VERSION,
705                    "Wallet module does not support new consensus version, please upgrade the module"
706                );
707            }
708            WalletConsensusItem::Default { variant, .. } => {
709                panic!("Received wallet consensus item with unknown variant {variant}");
710            }
711        }
712
713        Ok(())
714    }
715
716    async fn process_input<'a, 'b, 'c>(
717        &'a self,
718        dbtx: &mut DatabaseTransaction<'c>,
719        input: &'b WalletInput,
720        _in_point: InPoint,
721    ) -> Result<InputMeta, WalletInputError> {
722        let (outpoint, tx_out, pub_key) = match input {
723            WalletInput::V0(input) => {
724                if !self.block_is_known(dbtx, input.proof_block()).await {
725                    return Err(WalletInputError::UnknownPegInProofBlock(
726                        input.proof_block(),
727                    ));
728                }
729
730                input.verify(&self.secp, &self.cfg.consensus.peg_in_descriptor)?;
731
732                debug!(target: LOG_MODULE_WALLET, outpoint = %input.outpoint(), "Claiming peg-in");
733
734                (input.0.outpoint(), input.tx_output(), input.tweak_key())
735            }
736            WalletInput::V1(input) => {
737                let input_tx_out = dbtx
738                    .get_value(&UnspentTxOutKey(input.outpoint))
739                    .await
740                    .ok_or(WalletInputError::UnknownUTXO)?;
741
742                if input_tx_out.script_pubkey
743                    != self
744                        .cfg
745                        .consensus
746                        .peg_in_descriptor
747                        .tweak(&input.tweak_key, secp256k1::SECP256K1)
748                        .script_pubkey()
749                {
750                    return Err(WalletInputError::WrongOutputScript);
751                }
752
753                // Verifying this is not strictly necessary for the server as the tx_out is only
754                // used in backup and recovery.
755                if input.tx_out != input_tx_out {
756                    return Err(WalletInputError::WrongTxOut);
757                }
758
759                (input.outpoint, input_tx_out, input.tweak_key)
760            }
761            WalletInput::Default { variant, .. } => {
762                return Err(WalletInputError::UnknownInputVariant(
763                    UnknownWalletInputVariantError { variant: *variant },
764                ));
765            }
766        };
767
768        if dbtx
769            .insert_entry(&ClaimedPegInOutpointKey(outpoint), &())
770            .await
771            .is_some()
772        {
773            return Err(WalletInputError::PegInAlreadyClaimed);
774        }
775
776        dbtx.insert_new_entry(
777            &UTXOKey(outpoint),
778            &SpendableUTXO {
779                tweak: pub_key.serialize(),
780                amount: tx_out.value,
781            },
782        )
783        .await;
784
785        let next_index = get_recovery_count(dbtx).await;
786        dbtx.insert_new_entry(
787            &RecoveryItemKey(next_index),
788            &RecoveryItem::Input {
789                outpoint,
790                script: tx_out.script_pubkey,
791            },
792        )
793        .await;
794
795        let amount = tx_out.value.into();
796
797        let fee = self.cfg.consensus.fee_consensus.peg_in_abs;
798
799        calculate_pegin_metrics(dbtx, amount, fee);
800
801        Ok(InputMeta {
802            amount: TransactionItemAmounts {
803                amounts: Amounts::new_bitcoin(amount),
804                fees: Amounts::new_bitcoin(fee),
805            },
806            pub_key,
807        })
808    }
809
810    async fn process_output<'a, 'b>(
811        &'a self,
812        dbtx: &mut DatabaseTransaction<'b>,
813        output: &'a WalletOutput,
814        out_point: OutPoint,
815    ) -> Result<TransactionItemAmounts, WalletOutputError> {
816        let output = output.ensure_v0_ref()?;
817
818        // In 0.4.0 we began preventing RBF withdrawals. Once we reach EoL support
819        // for 0.4.0, we can safely remove RBF withdrawal logic.
820        // see: https://github.com/fedimint/fedimint/issues/5453
821        if let WalletOutputV0::Rbf(_) = output {
822            // This exists as an escape hatch for any federations that successfully
823            // processed an RBF withdrawal due to having a single UTXO owned by the
824            // federation. If a peer needs to resync the federation's history, they can
825            // enable this variable until they've successfully synced, then restart with
826            // this disabled.
827            if is_rbf_withdrawal_enabled() {
828                warn!(target: LOG_MODULE_WALLET, "processing rbf withdrawal");
829            } else {
830                return Err(DEPRECATED_RBF_ERROR);
831            }
832        }
833
834        let change_tweak = self.consensus_nonce(dbtx).await;
835
836        let mut tx = self.create_peg_out_tx(dbtx, output, &change_tweak).await?;
837
838        let fee_rate = self.consensus_fee_rate(dbtx).await;
839
840        StatelessWallet::validate_tx(&tx, output, fee_rate, self.cfg.consensus.network.0)?;
841
842        self.offline_wallet().sign_psbt(&mut tx.psbt);
843
844        let txid = tx.psbt.unsigned_tx.compute_txid();
845
846        info!(
847            target: LOG_MODULE_WALLET,
848            %txid,
849            "Signing peg out",
850        );
851
852        let sigs = tx
853            .psbt
854            .inputs
855            .iter_mut()
856            .map(|input| {
857                assert_eq!(
858                    input.partial_sigs.len(),
859                    1,
860                    "There was already more than one (our) or no signatures in input"
861                );
862
863                // TODO: don't put sig into PSBT in the first place
864                // We actually take out our own signature so everyone finalizes the tx in the
865                // same epoch.
866                let sig = std::mem::take(&mut input.partial_sigs)
867                    .into_values()
868                    .next()
869                    .expect("asserted previously");
870
871                // We drop SIGHASH_ALL, because we always use that and it is only present in the
872                // PSBT for compatibility with other tools.
873                secp256k1::ecdsa::Signature::from_der(&sig.to_vec()[..sig.to_vec().len() - 1])
874                    .expect("we serialized it ourselves that way")
875            })
876            .collect::<Vec<_>>();
877
878        // Delete used UTXOs
879        for input in &tx.psbt.unsigned_tx.input {
880            dbtx.remove_entry(&UTXOKey(input.previous_output)).await;
881        }
882
883        dbtx.insert_new_entry(&UnsignedTransactionKey(txid), &tx)
884            .await;
885
886        dbtx.insert_new_entry(&PegOutTxSignatureCI(txid), &sigs)
887            .await;
888
889        dbtx.insert_new_entry(
890            &PegOutBitcoinTransaction(out_point),
891            &WalletOutputOutcome::new_v0(txid),
892        )
893        .await;
894        let amount: fedimint_core::Amount = output.amount().into();
895        let fee = self.cfg.consensus.fee_consensus.peg_out_abs;
896        calculate_pegout_metrics(dbtx, amount, fee);
897        Ok(TransactionItemAmounts {
898            amounts: Amounts::new_bitcoin(amount),
899            fees: Amounts::new_bitcoin(fee),
900        })
901    }
902
903    async fn output_status(
904        &self,
905        dbtx: &mut DatabaseTransaction<'_>,
906        out_point: OutPoint,
907    ) -> Option<WalletOutputOutcome> {
908        dbtx.get_value(&PegOutBitcoinTransaction(out_point)).await
909    }
910
911    async fn audit(
912        &self,
913        dbtx: &mut DatabaseTransaction<'_>,
914        audit: &mut Audit,
915        module_instance_id: ModuleInstanceId,
916    ) {
917        audit
918            .add_items(dbtx, module_instance_id, &UTXOPrefixKey, |_, v| {
919                v.amount.to_sat() as i64 * 1000
920            })
921            .await;
922        audit
923            .add_items(
924                dbtx,
925                module_instance_id,
926                &UnsignedTransactionPrefixKey,
927                |_, v| match v.rbf {
928                    None => v.change.to_sat() as i64 * 1000,
929                    Some(rbf) => rbf.fees.amount().to_sat() as i64 * -1000,
930                },
931            )
932            .await;
933        audit
934            .add_items(
935                dbtx,
936                module_instance_id,
937                &PendingTransactionPrefixKey,
938                |_, v| match v.rbf {
939                    None => v.change.to_sat() as i64 * 1000,
940                    Some(rbf) => rbf.fees.amount().to_sat() as i64 * -1000,
941                },
942            )
943            .await;
944    }
945
946    fn api_endpoints(&self) -> Vec<ApiEndpoint<Self>> {
947        vec![
948            api_endpoint! {
949                BLOCK_COUNT_ENDPOINT,
950                ApiVersion::new(0, 0),
951                async |module: &Wallet, context, _params: ()| -> u32 {
952                    let db = context.db();
953                    let mut dbtx = db.begin_transaction_nc().await;
954                    Ok(module.consensus_block_count(&mut dbtx).await)
955                }
956            },
957            api_endpoint! {
958                BLOCK_COUNT_LOCAL_ENDPOINT,
959                ApiVersion::new(0, 0),
960                async |module: &Wallet, _context, _params: ()| -> Option<u32> {
961                    Ok(module.get_block_count().ok())
962                }
963            },
964            api_endpoint! {
965                PEG_OUT_FEES_ENDPOINT,
966                ApiVersion::new(0, 0),
967                async |module: &Wallet, context, params: (Address<NetworkUnchecked>, u64)| -> Option<PegOutFees> {
968                    let (address, sats) = params;
969                    let db = context.db();
970                    let mut dbtx = db.begin_transaction_nc().await;
971                    let feerate = module.consensus_fee_rate(&mut dbtx).await;
972
973                    // Since we are only calculating the tx size we can use an arbitrary dummy nonce.
974                    let dummy_tweak = [0; 33];
975
976                    let tx = module.offline_wallet().create_tx(
977                        bitcoin::Amount::from_sat(sats),
978                        // Note: While calling `assume_checked()` is generally unwise, it's fine
979                        // here since we're only returning a fee estimate, and we would still
980                        // reject a transaction with the wrong network upon attempted peg-out.
981                        address.assume_checked().script_pubkey(),
982                        vec![],
983                        module.available_utxos(&mut dbtx).await,
984                        feerate,
985                        &dummy_tweak,
986                        None
987                    );
988
989                    match tx {
990                        Err(error) => {
991                            // Usually from not enough spendable UTXOs
992                            warn!(target: LOG_MODULE_WALLET, "Error returning peg-out fees {error}");
993                            Ok(None)
994                        }
995                        Ok(tx) => Ok(Some(tx.fees))
996                    }
997                }
998            },
999            api_endpoint! {
1000                BITCOIN_KIND_ENDPOINT,
1001                ApiVersion::new(0, 1),
1002                async |module: &Wallet, _context, _params: ()| -> String {
1003                    Ok(module.btc_rpc.get_bitcoin_rpc_config().kind)
1004                }
1005            },
1006            api_endpoint! {
1007                BITCOIN_RPC_CONFIG_ENDPOINT,
1008                ApiVersion::new(0, 1),
1009                async |module: &Wallet, context, _params: ()| -> BitcoinRpcConfig {
1010                    check_auth(context)?;
1011                    let config = module.btc_rpc.get_bitcoin_rpc_config();
1012
1013                    // we need to remove auth, otherwise we'll send over the wire
1014                    let without_auth = config.url.clone().without_auth().map_err(|()| {
1015                        ApiError::server_error("Unable to remove auth from bitcoin config URL".to_string())
1016                    })?;
1017
1018                    Ok(BitcoinRpcConfig {
1019                        url: without_auth,
1020                        ..config
1021                    })
1022                }
1023            },
1024            api_endpoint! {
1025                WALLET_SUMMARY_ENDPOINT,
1026                ApiVersion::new(0, 1),
1027                async |module: &Wallet, context, _params: ()| -> WalletSummary {
1028                    let db = context.db();
1029                    let mut dbtx = db.begin_transaction_nc().await;
1030                    Ok(module.get_wallet_summary(&mut dbtx).await)
1031                }
1032            },
1033            api_endpoint! {
1034                MODULE_CONSENSUS_VERSION_ENDPOINT,
1035                ApiVersion::new(0, 2),
1036                async |module: &Wallet, context, _params: ()| -> ModuleConsensusVersion {
1037                    let db = context.db();
1038                    let mut dbtx = db.begin_transaction_nc().await;
1039                    Ok(module.consensus_module_consensus_version(&mut dbtx).await)
1040                }
1041            },
1042            api_endpoint! {
1043                SUPPORTED_MODULE_CONSENSUS_VERSION_ENDPOINT,
1044                ApiVersion::new(0, 2),
1045                async |_module: &Wallet, _context, _params: ()| -> ModuleConsensusVersion {
1046                    Ok(MODULE_CONSENSUS_VERSION)
1047                }
1048            },
1049            api_endpoint! {
1050                ACTIVATE_CONSENSUS_VERSION_VOTING_ENDPOINT,
1051                ApiVersion::new(0, 2),
1052                async |_module: &Wallet, context, _params: ()| -> () {
1053                    check_auth(context)?;
1054
1055                    let db = context.db();
1056                    let mut dbtx = db.begin_transaction().await;
1057                    dbtx.to_ref().insert_entry(&ConsensusVersionVotingActivationKey, &()).await;
1058                    dbtx.commit_tx_result().await?;
1059                    Ok(())
1060                }
1061            },
1062            api_endpoint! {
1063                UTXO_CONFIRMED_ENDPOINT,
1064                ApiVersion::new(0, 2),
1065                async |module: &Wallet, context, outpoint: bitcoin::OutPoint| -> bool {
1066                    let db = context.db();
1067                    let mut dbtx = db.begin_transaction_nc().await;
1068                    Ok(module.is_utxo_confirmed(&mut dbtx, outpoint).await)
1069                }
1070            },
1071            api_endpoint! {
1072                RECOVERY_COUNT_ENDPOINT,
1073                ApiVersion::new(0, 1),
1074                async |_module: &Wallet, context, _params: ()| -> u64 {
1075                    let db = context.db();
1076                    let mut dbtx = db.begin_transaction_nc().await;
1077                    Ok(get_recovery_count(&mut dbtx).await)
1078                }
1079            },
1080            api_endpoint! {
1081                RECOVERY_SLICE_ENDPOINT,
1082                ApiVersion::new(0, 1),
1083                async |_module: &Wallet, context, range: (u64, u64)| -> Vec<RecoveryItem> {
1084                    let db = context.db();
1085                    let mut dbtx = db.begin_transaction_nc().await;
1086                    Ok(get_recovery_slice(&mut dbtx, range).await)
1087                }
1088            },
1089        ]
1090    }
1091}
1092
1093async fn get_recovery_count(dbtx: &mut DatabaseTransaction<'_>) -> u64 {
1094    dbtx.find_by_prefix_sorted_descending(&RecoveryItemKeyPrefix)
1095        .await
1096        .next()
1097        .await
1098        .map_or(0, |entry| entry.0.0 + 1)
1099}
1100
1101async fn get_recovery_slice(
1102    dbtx: &mut DatabaseTransaction<'_>,
1103    range: (u64, u64),
1104) -> Vec<RecoveryItem> {
1105    dbtx.find_by_range(RecoveryItemKey(range.0)..RecoveryItemKey(range.1))
1106        .await
1107        .map(|entry| entry.1)
1108        .collect()
1109        .await
1110}
1111
1112fn calculate_pegin_metrics(
1113    dbtx: &mut DatabaseTransaction<'_>,
1114    amount: fedimint_core::Amount,
1115    fee: fedimint_core::Amount,
1116) {
1117    dbtx.on_commit(move || {
1118        WALLET_INOUT_SATS
1119            .with_label_values(&["incoming"])
1120            .observe(amount.sats_f64());
1121        WALLET_INOUT_FEES_SATS
1122            .with_label_values(&["incoming"])
1123            .observe(fee.sats_f64());
1124        WALLET_PEGIN_SATS.observe(amount.sats_f64());
1125        WALLET_PEGIN_FEES_SATS.observe(fee.sats_f64());
1126    });
1127}
1128
1129fn calculate_pegout_metrics(
1130    dbtx: &mut DatabaseTransaction<'_>,
1131    amount: fedimint_core::Amount,
1132    fee: fedimint_core::Amount,
1133) {
1134    dbtx.on_commit(move || {
1135        WALLET_INOUT_SATS
1136            .with_label_values(&["outgoing"])
1137            .observe(amount.sats_f64());
1138        WALLET_INOUT_FEES_SATS
1139            .with_label_values(&["outgoing"])
1140            .observe(fee.sats_f64());
1141        WALLET_PEGOUT_SATS.observe(amount.sats_f64());
1142        WALLET_PEGOUT_FEES_SATS.observe(fee.sats_f64());
1143    });
1144}
1145
1146#[derive(Debug)]
1147pub struct Wallet {
1148    cfg: WalletConfig,
1149    db: Database,
1150    secp: Secp256k1<All>,
1151    btc_rpc: ServerBitcoinRpcMonitor,
1152    our_peer_id: PeerId,
1153    /// Broadcasting pending txes can be triggered immediately with this
1154    broadcast_pending: Arc<Notify>,
1155    task_group: TaskGroup,
1156    /// Maximum consensus version supported by *all* our peers. Used to
1157    /// automatically activate new consensus versions as soon as everyone
1158    /// upgrades.
1159    peer_supported_consensus_version: watch::Receiver<Option<ModuleConsensusVersion>>,
1160}
1161
1162impl Wallet {
1163    pub async fn new(
1164        cfg: WalletConfig,
1165        db: &Database,
1166        task_group: &TaskGroup,
1167        our_peer_id: PeerId,
1168        module_api: DynModuleApi,
1169        server_bitcoin_rpc_monitor: ServerBitcoinRpcMonitor,
1170    ) -> anyhow::Result<Wallet> {
1171        let broadcast_pending = Arc::new(Notify::new());
1172        Self::spawn_broadcast_pending_task(
1173            task_group,
1174            &server_bitcoin_rpc_monitor,
1175            db,
1176            broadcast_pending.clone(),
1177        );
1178
1179        let peer_supported_consensus_version =
1180            Self::spawn_peer_supported_consensus_version_task(module_api, task_group, our_peer_id);
1181
1182        let status = retry("verify network", backoff_util::aggressive_backoff(), || {
1183            std::future::ready(
1184                server_bitcoin_rpc_monitor
1185                    .status()
1186                    .context("No connection to bitcoin rpc"),
1187            )
1188        })
1189        .await?;
1190
1191        ensure!(status.network == cfg.consensus.network.0, "Wrong Network");
1192
1193        let wallet = Wallet {
1194            cfg,
1195            db: db.clone(),
1196            secp: Default::default(),
1197            btc_rpc: server_bitcoin_rpc_monitor,
1198            our_peer_id,
1199            task_group: task_group.clone(),
1200            peer_supported_consensus_version,
1201            broadcast_pending,
1202        };
1203
1204        Ok(wallet)
1205    }
1206
1207    /// Try to attach signatures to a pending peg-out tx.
1208    fn sign_peg_out_psbt(
1209        &self,
1210        psbt: &mut Psbt,
1211        peer: PeerId,
1212        signature: &PegOutSignatureItem,
1213    ) -> Result<(), ProcessPegOutSigError> {
1214        let peer_key = self
1215            .cfg
1216            .consensus
1217            .peer_peg_in_keys
1218            .get(&peer)
1219            .expect("always called with valid peer id");
1220
1221        if psbt.inputs.len() != signature.signature.len() {
1222            return Err(ProcessPegOutSigError::WrongSignatureCount(
1223                psbt.inputs.len(),
1224                signature.signature.len(),
1225            ));
1226        }
1227
1228        let mut tx_hasher = SighashCache::new(&psbt.unsigned_tx);
1229        for (idx, (input, signature)) in psbt
1230            .inputs
1231            .iter_mut()
1232            .zip(signature.signature.iter())
1233            .enumerate()
1234        {
1235            let tx_hash = tx_hasher
1236                .p2wsh_signature_hash(
1237                    idx,
1238                    input
1239                        .witness_script
1240                        .as_ref()
1241                        .expect("Missing witness script"),
1242                    input.witness_utxo.as_ref().expect("Missing UTXO").value,
1243                    EcdsaSighashType::All,
1244                )
1245                .map_err(|_| ProcessPegOutSigError::SighashError)?;
1246
1247            let tweak = input
1248                .proprietary
1249                .get(&proprietary_tweak_key())
1250                .expect("we saved it with a tweak");
1251
1252            let tweaked_peer_key = peer_key.tweak(tweak, &self.secp);
1253            self.secp
1254                .verify_ecdsa(
1255                    &Message::from_digest_slice(&tx_hash[..]).unwrap(),
1256                    signature,
1257                    &tweaked_peer_key.key,
1258                )
1259                .map_err(|_| ProcessPegOutSigError::InvalidSignature)?;
1260
1261            if input
1262                .partial_sigs
1263                .insert(tweaked_peer_key.into(), EcdsaSig::sighash_all(*signature))
1264                .is_some()
1265            {
1266                // Should never happen since peers only sign a PSBT once
1267                return Err(ProcessPegOutSigError::DuplicateSignature);
1268            }
1269        }
1270        Ok(())
1271    }
1272
1273    fn finalize_peg_out_psbt(
1274        &self,
1275        mut unsigned: UnsignedTransaction,
1276    ) -> Result<PendingTransaction, ProcessPegOutSigError> {
1277        // We need to save the change output's tweak key to be able to access the funds
1278        // later on. The tweak is extracted here because the psbt is moved next
1279        // and not available anymore when the tweak is actually needed in the
1280        // end to be put into the batch on success.
1281        let change_tweak: [u8; 33] = unsigned
1282            .psbt
1283            .outputs
1284            .iter()
1285            .find_map(|output| output.proprietary.get(&proprietary_tweak_key()).cloned())
1286            .ok_or(ProcessPegOutSigError::MissingOrMalformedChangeTweak)?
1287            .try_into()
1288            .map_err(|_| ProcessPegOutSigError::MissingOrMalformedChangeTweak)?;
1289
1290        if let Err(error) = unsigned.psbt.finalize_mut(&self.secp) {
1291            return Err(ProcessPegOutSigError::ErrorFinalizingPsbt(error));
1292        }
1293
1294        let tx = unsigned.psbt.clone().extract_tx_unchecked_fee_rate();
1295
1296        Ok(PendingTransaction {
1297            tx,
1298            tweak: change_tweak,
1299            change: unsigned.change,
1300            destination: unsigned.destination,
1301            fees: unsigned.fees,
1302            selected_utxos: unsigned.selected_utxos,
1303            peg_out_amount: unsigned.peg_out_amount,
1304            rbf: unsigned.rbf,
1305        })
1306    }
1307
1308    fn get_block_count(&self) -> anyhow::Result<u32> {
1309        self.btc_rpc
1310            .status()
1311            .context("No bitcoin rpc connection")
1312            .and_then(|status| {
1313                status
1314                    .block_count
1315                    .try_into()
1316                    .map_err(|_| format_err!("Block count exceeds u32 limits"))
1317            })
1318    }
1319
1320    pub fn get_fee_rate_opt(&self) -> Feerate {
1321        // `get_feerate_multiplier` is clamped and can't be negative
1322        // feerate sources as clamped and can't be negative or too large
1323        #[allow(clippy::cast_precision_loss)]
1324        #[allow(clippy::cast_sign_loss)]
1325        Feerate {
1326            sats_per_kvb: ((self
1327                .btc_rpc
1328                .status()
1329                .map_or(self.cfg.consensus.default_fee, |status| status.fee_rate)
1330                .sats_per_kvb as f64
1331                * get_feerate_multiplier())
1332            .round()) as u64,
1333        }
1334    }
1335
1336    pub async fn consensus_block_count(&self, dbtx: &mut DatabaseTransaction<'_>) -> u32 {
1337        let peer_count = self.cfg.consensus.peer_peg_in_keys.to_num_peers().total();
1338
1339        let mut counts = dbtx
1340            .find_by_prefix(&BlockCountVotePrefix)
1341            .await
1342            .map(|entry| entry.1)
1343            .collect::<Vec<u32>>()
1344            .await;
1345
1346        assert!(counts.len() <= peer_count);
1347
1348        while counts.len() < peer_count {
1349            counts.push(0);
1350        }
1351
1352        counts.sort_unstable();
1353
1354        counts[peer_count / 2]
1355    }
1356
1357    pub async fn consensus_fee_rate(&self, dbtx: &mut DatabaseTransaction<'_>) -> Feerate {
1358        let peer_count = self.cfg.consensus.peer_peg_in_keys.to_num_peers().total();
1359
1360        let mut rates = dbtx
1361            .find_by_prefix(&FeeRateVotePrefix)
1362            .await
1363            .map(|(.., rate)| rate)
1364            .collect::<Vec<_>>()
1365            .await;
1366
1367        assert!(rates.len() <= peer_count);
1368
1369        while rates.len() < peer_count {
1370            rates.push(self.cfg.consensus.default_fee);
1371        }
1372
1373        rates.sort_unstable();
1374
1375        rates[peer_count / 2]
1376    }
1377
1378    async fn consensus_module_consensus_version(
1379        &self,
1380        dbtx: &mut DatabaseTransaction<'_>,
1381    ) -> ModuleConsensusVersion {
1382        let num_peers = self.cfg.consensus.peer_peg_in_keys.to_num_peers();
1383
1384        let mut versions = dbtx
1385            .find_by_prefix(&ConsensusVersionVotePrefix)
1386            .await
1387            .map(|entry| entry.1)
1388            .collect::<Vec<ModuleConsensusVersion>>()
1389            .await;
1390
1391        while versions.len() < num_peers.total() {
1392            versions.push(ModuleConsensusVersion::new(2, 0));
1393        }
1394
1395        assert_eq!(versions.len(), num_peers.total());
1396
1397        versions.sort_unstable();
1398
1399        assert!(versions.first() <= versions.last());
1400
1401        versions[num_peers.max_evil()]
1402    }
1403
1404    pub async fn consensus_nonce(&self, dbtx: &mut DatabaseTransaction<'_>) -> [u8; 33] {
1405        let nonce_idx = dbtx.get_value(&PegOutNonceKey).await.unwrap_or(0);
1406        dbtx.insert_entry(&PegOutNonceKey, &(nonce_idx + 1)).await;
1407
1408        nonce_from_idx(nonce_idx)
1409    }
1410
1411    async fn sync_up_to_consensus_count(
1412        &self,
1413        dbtx: &mut DatabaseTransaction<'_>,
1414        old_count: u32,
1415        new_count: u32,
1416    ) {
1417        info!(
1418            target: LOG_MODULE_WALLET,
1419            old_count,
1420            new_count,
1421            blocks_to_go = new_count - old_count,
1422            "New block count consensus, initiating sync",
1423        );
1424
1425        // Before we can safely call our bitcoin backend to process the new consensus
1426        // count, we need to ensure we observed enough confirmations
1427        self.wait_for_finality_confs_or_shutdown(new_count).await;
1428
1429        for height in old_count..new_count {
1430            info!(
1431                target: LOG_MODULE_WALLET,
1432                height,
1433                "Processing block of height {height}",
1434            );
1435
1436            // TODO: use batching for mainnet syncing
1437            trace!(block = height, "Fetching block hash");
1438            let block_hash = retry("get_block_hash", backoff_util::background_backoff(), || {
1439                self.btc_rpc.get_block_hash(u64::from(height)) // TODO: use u64 for height everywhere
1440            })
1441            .await
1442            .expect("bitcoind rpc to get block hash");
1443
1444            let block = retry("get_block", backoff_util::background_backoff(), || {
1445                self.btc_rpc.get_block(&block_hash)
1446            })
1447            .await
1448            .expect("bitcoind rpc to get block");
1449
1450            if let Some(prev_block_height) = height.checked_sub(1) {
1451                if let Some(hash) = dbtx
1452                    .get_value(&BlockHashByHeightKey(prev_block_height))
1453                    .await
1454                {
1455                    assert_eq!(block.header.prev_blockhash, hash.0);
1456                } else {
1457                    warn!(
1458                        target: LOG_MODULE_WALLET,
1459                        %height,
1460                        %block_hash,
1461                        %prev_block_height,
1462                        prev_blockhash = %block.header.prev_blockhash,
1463                        "Missing previous block hash. This should only happen on the first processed block height."
1464                    );
1465                }
1466            }
1467
1468            if self.consensus_module_consensus_version(dbtx).await
1469                >= ModuleConsensusVersion::new(2, 2)
1470            {
1471                for transaction in &block.txdata {
1472                    // We maintain the subset of unspent P2WSH transaction outputs created
1473                    // since the module was running on the new consensus version, which might be
1474                    // the same time as the genesis session.
1475
1476                    for tx_in in &transaction.input {
1477                        dbtx.remove_entry(&UnspentTxOutKey(tx_in.previous_output))
1478                            .await;
1479                    }
1480
1481                    for (vout, tx_out) in transaction.output.iter().enumerate() {
1482                        let should_track_utxo = if self.cfg.consensus.peer_peg_in_keys.len() > 1 {
1483                            tx_out.script_pubkey.is_p2wsh()
1484                        } else {
1485                            tx_out.script_pubkey.is_p2wpkh()
1486                        };
1487
1488                        if should_track_utxo {
1489                            let outpoint = bitcoin::OutPoint {
1490                                txid: transaction.compute_txid(),
1491                                vout: vout as u32,
1492                            };
1493
1494                            dbtx.insert_new_entry(&UnspentTxOutKey(outpoint), tx_out)
1495                                .await;
1496                        }
1497                    }
1498                }
1499            }
1500
1501            let pending_transactions = dbtx
1502                .find_by_prefix(&PendingTransactionPrefixKey)
1503                .await
1504                .map(|(key, transaction)| (key.0, transaction))
1505                .collect::<HashMap<Txid, PendingTransaction>>()
1506                .await;
1507            let pending_transactions_len = pending_transactions.len();
1508
1509            debug!(
1510                target: LOG_MODULE_WALLET,
1511                ?height,
1512                ?pending_transactions_len,
1513                "Recognizing change UTXOs"
1514            );
1515            for (txid, tx) in &pending_transactions {
1516                let is_tx_in_block = block.txdata.iter().any(|tx| tx.compute_txid() == *txid);
1517
1518                if is_tx_in_block {
1519                    debug!(
1520                        target: LOG_MODULE_WALLET,
1521                        ?txid, ?height, ?block_hash, "Recognizing change UTXO"
1522                    );
1523                    self.recognize_change_utxo(dbtx, tx).await;
1524                } else {
1525                    debug!(
1526                        target: LOG_MODULE_WALLET,
1527                        ?txid,
1528                        ?height,
1529                        ?block_hash,
1530                        "Pending transaction not yet confirmed in this block"
1531                    );
1532                }
1533            }
1534
1535            dbtx.insert_new_entry(&BlockHashKey(block_hash), &()).await;
1536            dbtx.insert_new_entry(
1537                &BlockHashByHeightKey(height),
1538                &BlockHashByHeightValue(block_hash),
1539            )
1540            .await;
1541
1542            info!(
1543                target: LOG_MODULE_WALLET,
1544                height,
1545                ?block_hash,
1546                "Successfully processed block of height {height}",
1547            );
1548        }
1549    }
1550
1551    /// Add a change UTXO to our spendable UTXO database after it was included
1552    /// in a block that we got consensus on.
1553    async fn recognize_change_utxo(
1554        &self,
1555        dbtx: &mut DatabaseTransaction<'_>,
1556        pending_tx: &PendingTransaction,
1557    ) {
1558        self.remove_rbf_transactions(dbtx, pending_tx).await;
1559
1560        let script_pk = self
1561            .cfg
1562            .consensus
1563            .peg_in_descriptor
1564            .tweak(&pending_tx.tweak, &self.secp)
1565            .script_pubkey();
1566        for (idx, output) in pending_tx.tx.output.iter().enumerate() {
1567            if output.script_pubkey == script_pk {
1568                dbtx.insert_entry(
1569                    &UTXOKey(bitcoin::OutPoint {
1570                        txid: pending_tx.tx.compute_txid(),
1571                        vout: idx as u32,
1572                    }),
1573                    &SpendableUTXO {
1574                        tweak: pending_tx.tweak,
1575                        amount: output.value,
1576                    },
1577                )
1578                .await;
1579            }
1580        }
1581    }
1582
1583    /// Removes the `PendingTransaction` and any transactions tied to it via RBF
1584    async fn remove_rbf_transactions(
1585        &self,
1586        dbtx: &mut DatabaseTransaction<'_>,
1587        pending_tx: &PendingTransaction,
1588    ) {
1589        let mut all_transactions: BTreeMap<Txid, PendingTransaction> = dbtx
1590            .find_by_prefix(&PendingTransactionPrefixKey)
1591            .await
1592            .map(|(key, val)| (key.0, val))
1593            .collect::<BTreeMap<Txid, PendingTransaction>>()
1594            .await;
1595
1596        // We need to search and remove all `PendingTransactions` invalidated by RBF
1597        let mut pending_to_remove = vec![pending_tx.clone()];
1598        while let Some(removed) = pending_to_remove.pop() {
1599            all_transactions.remove(&removed.tx.compute_txid());
1600            dbtx.remove_entry(&PendingTransactionKey(removed.tx.compute_txid()))
1601                .await;
1602
1603            // Search for tx that this `removed` has as RBF
1604            if let Some(rbf) = &removed.rbf
1605                && let Some(tx) = all_transactions.get(&rbf.txid)
1606            {
1607                pending_to_remove.push(tx.clone());
1608            }
1609
1610            // Search for tx that wanted to RBF the `removed` one
1611            for tx in all_transactions.values() {
1612                if let Some(rbf) = &tx.rbf
1613                    && rbf.txid == removed.tx.compute_txid()
1614                {
1615                    pending_to_remove.push(tx.clone());
1616                }
1617            }
1618        }
1619    }
1620
1621    async fn block_is_known(
1622        &self,
1623        dbtx: &mut DatabaseTransaction<'_>,
1624        block_hash: BlockHash,
1625    ) -> bool {
1626        dbtx.get_value(&BlockHashKey(block_hash)).await.is_some()
1627    }
1628
1629    async fn create_peg_out_tx(
1630        &self,
1631        dbtx: &mut DatabaseTransaction<'_>,
1632        output: &WalletOutputV0,
1633        change_tweak: &[u8; 33],
1634    ) -> Result<UnsignedTransaction, WalletOutputError> {
1635        match output {
1636            WalletOutputV0::PegOut(peg_out) => self.offline_wallet().create_tx(
1637                peg_out.amount,
1638                // Note: While calling `assume_checked()` is generally unwise, checking the
1639                // network here could be a consensus-breaking change. Ignoring the network
1640                // is fine here since we validate it in `process_output()`.
1641                peg_out.recipient.clone().assume_checked().script_pubkey(),
1642                vec![],
1643                self.available_utxos(dbtx).await,
1644                peg_out.fees.fee_rate,
1645                change_tweak,
1646                None,
1647            ),
1648            WalletOutputV0::Rbf(rbf) => {
1649                let tx = dbtx
1650                    .get_value(&PendingTransactionKey(rbf.txid))
1651                    .await
1652                    .ok_or(WalletOutputError::RbfTransactionIdNotFound)?;
1653
1654                self.offline_wallet().create_tx(
1655                    tx.peg_out_amount,
1656                    tx.destination,
1657                    tx.selected_utxos,
1658                    self.available_utxos(dbtx).await,
1659                    tx.fees.fee_rate,
1660                    change_tweak,
1661                    Some(rbf.clone()),
1662                )
1663            }
1664        }
1665    }
1666
1667    async fn available_utxos(
1668        &self,
1669        dbtx: &mut DatabaseTransaction<'_>,
1670    ) -> Vec<(UTXOKey, SpendableUTXO)> {
1671        dbtx.find_by_prefix(&UTXOPrefixKey)
1672            .await
1673            .collect::<Vec<(UTXOKey, SpendableUTXO)>>()
1674            .await
1675    }
1676
1677    pub async fn get_wallet_value(&self, dbtx: &mut DatabaseTransaction<'_>) -> bitcoin::Amount {
1678        let sat_sum = self
1679            .available_utxos(dbtx)
1680            .await
1681            .into_iter()
1682            .map(|(_, utxo)| utxo.amount.to_sat())
1683            .sum();
1684        bitcoin::Amount::from_sat(sat_sum)
1685    }
1686
1687    async fn get_wallet_summary(&self, dbtx: &mut DatabaseTransaction<'_>) -> WalletSummary {
1688        fn partition_peg_out_and_change(
1689            transactions: Vec<Transaction>,
1690        ) -> (Vec<TxOutputSummary>, Vec<TxOutputSummary>) {
1691            let mut peg_out_txos: Vec<TxOutputSummary> = Vec::new();
1692            let mut change_utxos: Vec<TxOutputSummary> = Vec::new();
1693
1694            for tx in transactions {
1695                let txid = tx.compute_txid();
1696
1697                // to identify outputs for the peg_out (idx = 0) and change (idx = 1), we lean
1698                // on how the wallet constructs the transaction
1699                let peg_out_output = tx
1700                    .output
1701                    .first()
1702                    .expect("tx must contain withdrawal output");
1703
1704                let change_output = tx.output.last().expect("tx must contain change output");
1705
1706                peg_out_txos.push(TxOutputSummary {
1707                    outpoint: bitcoin::OutPoint { txid, vout: 0 },
1708                    amount: peg_out_output.value,
1709                });
1710
1711                change_utxos.push(TxOutputSummary {
1712                    outpoint: bitcoin::OutPoint { txid, vout: 1 },
1713                    amount: change_output.value,
1714                });
1715            }
1716
1717            (peg_out_txos, change_utxos)
1718        }
1719
1720        let spendable_utxos = self
1721            .available_utxos(dbtx)
1722            .await
1723            .iter()
1724            .map(|(utxo_key, spendable_utxo)| TxOutputSummary {
1725                outpoint: utxo_key.0,
1726                amount: spendable_utxo.amount,
1727            })
1728            .collect::<Vec<_>>();
1729
1730        // constructed peg-outs without threshold signatures
1731        let unsigned_transactions = dbtx
1732            .find_by_prefix(&UnsignedTransactionPrefixKey)
1733            .await
1734            .map(|(_tx_key, tx)| tx.psbt.unsigned_tx)
1735            .collect::<Vec<_>>()
1736            .await;
1737
1738        // peg-outs with threshold signatures, awaiting finality delay confirmations
1739        let unconfirmed_transactions = dbtx
1740            .find_by_prefix(&PendingTransactionPrefixKey)
1741            .await
1742            .map(|(_tx_key, tx)| tx.tx)
1743            .collect::<Vec<_>>()
1744            .await;
1745
1746        let (unsigned_peg_out_txos, unsigned_change_utxos) =
1747            partition_peg_out_and_change(unsigned_transactions);
1748
1749        let (unconfirmed_peg_out_txos, unconfirmed_change_utxos) =
1750            partition_peg_out_and_change(unconfirmed_transactions);
1751
1752        WalletSummary {
1753            spendable_utxos,
1754            unsigned_peg_out_txos,
1755            unsigned_change_utxos,
1756            unconfirmed_peg_out_txos,
1757            unconfirmed_change_utxos,
1758        }
1759    }
1760
1761    async fn is_utxo_confirmed(
1762        &self,
1763        dbtx: &mut DatabaseTransaction<'_>,
1764        outpoint: bitcoin::OutPoint,
1765    ) -> bool {
1766        dbtx.get_value(&UnspentTxOutKey(outpoint)).await.is_some()
1767    }
1768
1769    fn offline_wallet(&'_ self) -> StatelessWallet<'_> {
1770        StatelessWallet {
1771            descriptor: &self.cfg.consensus.peg_in_descriptor,
1772            secret_key: &self.cfg.private.peg_in_key,
1773            secp: &self.secp,
1774        }
1775    }
1776
1777    fn spawn_broadcast_pending_task(
1778        task_group: &TaskGroup,
1779        server_bitcoin_rpc_monitor: &ServerBitcoinRpcMonitor,
1780        db: &Database,
1781        broadcast_pending_notify: Arc<Notify>,
1782    ) {
1783        task_group.spawn_cancellable("broadcast pending", {
1784            let btc_rpc = server_bitcoin_rpc_monitor.clone();
1785            let db = db.clone();
1786            run_broadcast_pending_tx(db, btc_rpc, broadcast_pending_notify)
1787        });
1788    }
1789
1790    /// Get the bitcoin network for UI display
1791    pub fn network_ui(&self) -> Network {
1792        self.cfg.consensus.network.0
1793    }
1794
1795    /// Get the current consensus block count for UI display
1796    pub async fn consensus_block_count_ui(&self) -> u32 {
1797        self.consensus_block_count(&mut self.db.begin_transaction_nc().await)
1798            .await
1799    }
1800
1801    /// Get the current consensus fee rate for UI display
1802    pub async fn consensus_feerate_ui(&self) -> Feerate {
1803        self.consensus_fee_rate(&mut self.db.begin_transaction_nc().await)
1804            .await
1805    }
1806
1807    /// Get the current wallet summary for UI display
1808    pub async fn get_wallet_summary_ui(&self) -> WalletSummary {
1809        self.get_wallet_summary(&mut self.db.begin_transaction_nc().await)
1810            .await
1811    }
1812
1813    /// Shutdown the task group shared throughout fedimintd, giving 60 seconds
1814    /// for other services to gracefully shutdown.
1815    async fn graceful_shutdown(&self) {
1816        if let Err(e) = self
1817            .task_group
1818            .clone()
1819            .shutdown_join_all(Some(Duration::from_mins(1)))
1820            .await
1821        {
1822            panic!("Error while shutting down fedimintd task group: {e}");
1823        }
1824    }
1825
1826    /// Returns once our bitcoin backend observes finality delay confirmations
1827    /// of the consensus block count. If we don't observe enough confirmations
1828    /// after one hour, we gracefully shutdown fedimintd. This is necessary
1829    /// since we can no longer participate in consensus if our bitcoin backend
1830    /// is unable to observe the same chain tip as our peers.
1831    async fn wait_for_finality_confs_or_shutdown(&self, consensus_block_count: u32) {
1832        let backoff = if is_running_in_test_env() {
1833            // every 100ms for 60s
1834            backoff_util::custom_backoff(
1835                Duration::from_millis(100),
1836                Duration::from_millis(100),
1837                Some(10 * 60),
1838            )
1839        } else {
1840            // every max 10s for 1 hour
1841            backoff_util::fibonacci_max_one_hour()
1842        };
1843
1844        let wait_for_finality_confs = || async {
1845            let our_chain_tip_block_count = self.get_block_count()?;
1846            let consensus_chain_tip_block_count =
1847                consensus_block_count + self.cfg.consensus.finality_delay;
1848
1849            if consensus_chain_tip_block_count <= our_chain_tip_block_count {
1850                Ok(())
1851            } else {
1852                Err(anyhow::anyhow!("not enough confirmations"))
1853            }
1854        };
1855
1856        if retry("wait_for_finality_confs", backoff, wait_for_finality_confs)
1857            .await
1858            .is_err()
1859        {
1860            self.graceful_shutdown().await;
1861        }
1862    }
1863
1864    fn spawn_peer_supported_consensus_version_task(
1865        api_client: DynModuleApi,
1866        task_group: &TaskGroup,
1867        our_peer_id: PeerId,
1868    ) -> watch::Receiver<Option<ModuleConsensusVersion>> {
1869        let (sender, receiver) = watch::channel(None);
1870        task_group.spawn_cancellable("fetch-peer-consensus-versions", async move {
1871            loop {
1872                let request_futures = api_client.all_peers().iter().filter_map(|&peer| {
1873                    if peer == our_peer_id {
1874                        return None;
1875                    }
1876
1877                    let api_client_inner = api_client.clone();
1878                    Some(async move {
1879                        api_client_inner
1880                            .request_single_peer::<ModuleConsensusVersion>(
1881                                SUPPORTED_MODULE_CONSENSUS_VERSION_ENDPOINT.to_owned(),
1882                                ApiRequestErased::default(),
1883                                peer,
1884                            )
1885                            .await
1886                            .inspect(|res| debug!(
1887                                target: LOG_MODULE_WALLET,
1888                                %peer,
1889                                %our_peer_id,
1890                                ?res,
1891                                "Fetched supported module consensus version from peer"
1892                            ))
1893                            .inspect_err(|err| warn!(
1894                                target: LOG_MODULE_WALLET,
1895                                 %peer,
1896                                 err=%err.fmt_compact(),
1897                                "Failed to fetch consensus version from peer"
1898                            ))
1899                            .ok()
1900                    })
1901                });
1902
1903                let peer_consensus_versions = join_all(request_futures)
1904                    .await
1905                    .into_iter()
1906                    .flatten()
1907                    .collect::<Vec<_>>();
1908
1909                let sorted_consensus_versions = peer_consensus_versions
1910                    .into_iter()
1911                    .chain(std::iter::once(MODULE_CONSENSUS_VERSION))
1912                    .sorted()
1913                    .collect::<Vec<_>>();
1914                let all_peers_supported_version =
1915                    if sorted_consensus_versions.len() == api_client.all_peers().len() {
1916                        let min_supported_version = *sorted_consensus_versions
1917                            .first()
1918                            .expect("at least one element");
1919
1920                        debug!(
1921                            target: LOG_MODULE_WALLET,
1922                            ?sorted_consensus_versions,
1923                            "Fetched supported consensus versions from peers"
1924                        );
1925
1926                        Some(min_supported_version)
1927                    } else {
1928                        assert!(
1929                            sorted_consensus_versions.len() <= api_client.all_peers().len(),
1930                            "Too many peer responses",
1931                        );
1932                        trace!(
1933                            target: LOG_MODULE_WALLET,
1934                            ?sorted_consensus_versions,
1935                            "Not all peers have reported their consensus version yet"
1936                        );
1937                        None
1938                    };
1939
1940                #[allow(clippy::disallowed_methods)]
1941                if sender.send(all_peers_supported_version).is_err() {
1942                    warn!(target: LOG_MODULE_WALLET, "Failed to send consensus version to watch channel, stopping task");
1943                    break;
1944                }
1945
1946                if is_running_in_test_env() {
1947                    // Even in tests we don't want to spam the federation with requests about it
1948                    sleep(Duration::from_secs(5)).await;
1949                } else {
1950                    sleep(Duration::from_mins(10)).await;
1951                }
1952            }
1953        });
1954        receiver
1955    }
1956}
1957
1958#[instrument(target = LOG_MODULE_WALLET, level = "debug", skip_all)]
1959pub async fn run_broadcast_pending_tx(
1960    db: Database,
1961    rpc: ServerBitcoinRpcMonitor,
1962    broadcast: Arc<Notify>,
1963) {
1964    loop {
1965        // Unless something new happened, we broadcast once a minute
1966        let _ = tokio::time::timeout(Duration::from_mins(1), broadcast.notified()).await;
1967        broadcast_pending_tx(db.begin_transaction_nc().await, &rpc).await;
1968    }
1969}
1970
1971pub async fn broadcast_pending_tx(
1972    mut dbtx: DatabaseTransaction<'_>,
1973    rpc: &ServerBitcoinRpcMonitor,
1974) {
1975    let pending_tx: Vec<PendingTransaction> = dbtx
1976        .find_by_prefix(&PendingTransactionPrefixKey)
1977        .await
1978        .map(|(_, val)| val)
1979        .collect::<Vec<_>>()
1980        .await;
1981    let rbf_txids: BTreeSet<Txid> = pending_tx
1982        .iter()
1983        .filter_map(|tx| tx.rbf.clone().map(|rbf| rbf.txid))
1984        .collect();
1985    if !pending_tx.is_empty() {
1986        debug!(
1987            target: LOG_MODULE_WALLET,
1988            "Broadcasting pending transactions (total={}, rbf={})",
1989            pending_tx.len(),
1990            rbf_txids.len()
1991        );
1992    }
1993
1994    for PendingTransaction { tx, .. } in pending_tx {
1995        if !rbf_txids.contains(&tx.compute_txid()) {
1996            debug!(
1997                target: LOG_MODULE_WALLET,
1998                tx = %tx.compute_txid(),
1999                weight = tx.weight().to_wu(),
2000                output = ?tx.output,
2001                "Broadcasting peg-out",
2002            );
2003            trace!(transaction = ?tx);
2004            rpc.submit_transaction(tx).await;
2005        }
2006    }
2007}
2008
2009struct StatelessWallet<'a> {
2010    descriptor: &'a Descriptor<CompressedPublicKey>,
2011    secret_key: &'a secp256k1::SecretKey,
2012    secp: &'a secp256k1::Secp256k1<secp256k1::All>,
2013}
2014
2015impl StatelessWallet<'_> {
2016    /// Given a tx created from an `WalletOutput`, validate there will be no
2017    /// issues submitting the transaction to the Bitcoin network
2018    fn validate_tx(
2019        tx: &UnsignedTransaction,
2020        output: &WalletOutputV0,
2021        consensus_fee_rate: Feerate,
2022        network: Network,
2023    ) -> Result<(), WalletOutputError> {
2024        if let WalletOutputV0::PegOut(peg_out) = output
2025            && !peg_out.recipient.is_valid_for_network(network)
2026        {
2027            return Err(WalletOutputError::WrongNetwork(
2028                NetworkLegacyEncodingWrapper(network),
2029                NetworkLegacyEncodingWrapper(get_network_for_address(&peg_out.recipient)),
2030            ));
2031        }
2032
2033        // Validate the tx amount is over the dust limit
2034        if tx.peg_out_amount < tx.destination.minimal_non_dust() {
2035            return Err(WalletOutputError::PegOutUnderDustLimit);
2036        }
2037
2038        // Validate tx fee rate is above the consensus fee rate
2039        if tx.fees.fee_rate < consensus_fee_rate {
2040            return Err(WalletOutputError::PegOutFeeBelowConsensus(
2041                tx.fees.fee_rate,
2042                consensus_fee_rate,
2043            ));
2044        }
2045
2046        // Validate added fees are above the min relay tx fee
2047        // BIP-0125 requires 1 sat/vb for RBF by default (same as normal txs)
2048        let fees = match output {
2049            WalletOutputV0::PegOut(pegout) => pegout.fees,
2050            WalletOutputV0::Rbf(rbf) => rbf.fees,
2051        };
2052        if fees.fee_rate.sats_per_kvb < u64::from(DEFAULT_MIN_RELAY_TX_FEE) {
2053            return Err(WalletOutputError::BelowMinRelayFee);
2054        }
2055
2056        // Validate fees weight matches the actual weight
2057        if fees.total_weight != tx.fees.total_weight {
2058            return Err(WalletOutputError::TxWeightIncorrect(
2059                fees.total_weight,
2060                tx.fees.total_weight,
2061            ));
2062        }
2063
2064        Ok(())
2065    }
2066
2067    /// Attempts to create a tx ready to be signed from available UTXOs.
2068    //
2069    // * `peg_out_amount`: How much the peg-out should be
2070    // * `destination`: The address the user is pegging-out to
2071    // * `included_utxos`: UXTOs that must be included (for RBF)
2072    // * `remaining_utxos`: All other spendable UXTOs
2073    // * `fee_rate`: How much needs to be spent on fees
2074    // * `change_tweak`: How the federation can recognize it's change UTXO
2075    // * `rbf`: If this is an RBF transaction
2076    #[allow(clippy::too_many_arguments)]
2077    fn create_tx(
2078        &self,
2079        peg_out_amount: bitcoin::Amount,
2080        destination: ScriptBuf,
2081        mut included_utxos: Vec<(UTXOKey, SpendableUTXO)>,
2082        mut remaining_utxos: Vec<(UTXOKey, SpendableUTXO)>,
2083        mut fee_rate: Feerate,
2084        change_tweak: &[u8; 33],
2085        rbf: Option<Rbf>,
2086    ) -> Result<UnsignedTransaction, WalletOutputError> {
2087        // Add the rbf fees to the existing tx fees
2088        if let Some(rbf) = &rbf {
2089            fee_rate.sats_per_kvb += rbf.fees.fee_rate.sats_per_kvb;
2090        }
2091
2092        // When building a transaction we need to take care of two things:
2093        //  * We need enough input amount to fund all outputs
2094        //  * We need to keep an eye on the tx weight so we can factor the fees into out
2095        //    calculation
2096        // We then go on to calculate the base size of the transaction `total_weight`
2097        // and the maximum weight per added input which we will add every time
2098        // we select an input.
2099        let change_script = self.derive_script(change_tweak);
2100        let out_weight = (destination.len() * 4 + 1 + 32
2101            // Add change script weight, it's very likely to be needed if not we just overpay in fees
2102            + 1 // script len varint, 1 byte for all addresses we accept
2103            + change_script.len() * 4 // script len
2104            + 32) as u64; // value
2105        let mut total_weight = 16 + // version
2106            12 + // up to 2**16-1 inputs
2107            12 + // up to 2**16-1 outputs
2108            out_weight + // weight of all outputs
2109            16; // lock time
2110        // https://github.com/fedimint/fedimint/issues/4590
2111        #[allow(deprecated)]
2112        let max_input_weight = (self
2113            .descriptor
2114            .max_satisfaction_weight()
2115            .expect("is satisfyable") +
2116            128 + // TxOutHash
2117            16 + // TxOutIndex
2118            16) as u64; // sequence
2119
2120        // Ensure deterministic ordering of UTXOs for all peers
2121        included_utxos.sort_by_key(|(_, utxo)| utxo.amount);
2122        remaining_utxos.sort_by_key(|(_, utxo)| utxo.amount);
2123        included_utxos.extend(remaining_utxos);
2124
2125        // Finally we initialize our accumulator for selected input amounts
2126        let mut total_selected_value = bitcoin::Amount::from_sat(0);
2127        let mut selected_utxos: Vec<(UTXOKey, SpendableUTXO)> = vec![];
2128        let mut fees = fee_rate.calculate_fee(total_weight);
2129
2130        while total_selected_value < peg_out_amount + change_script.minimal_non_dust() + fees {
2131            match included_utxos.pop() {
2132                Some((utxo_key, utxo)) => {
2133                    total_selected_value += utxo.amount;
2134                    total_weight += max_input_weight;
2135                    fees = fee_rate.calculate_fee(total_weight);
2136                    selected_utxos.push((utxo_key, utxo));
2137                }
2138                _ => return Err(WalletOutputError::NotEnoughSpendableUTXO), // Not enough UTXOs
2139            }
2140        }
2141
2142        // We always pay ourselves change back to ensure that we don't lose anything due
2143        // to dust
2144        let change = total_selected_value - fees - peg_out_amount;
2145        let output: Vec<TxOut> = vec![
2146            TxOut {
2147                value: peg_out_amount,
2148                script_pubkey: destination.clone(),
2149            },
2150            TxOut {
2151                value: change,
2152                script_pubkey: change_script,
2153            },
2154        ];
2155        let mut change_out = bitcoin::psbt::Output::default();
2156        change_out
2157            .proprietary
2158            .insert(proprietary_tweak_key(), change_tweak.to_vec());
2159
2160        info!(
2161            target: LOG_MODULE_WALLET,
2162            inputs = selected_utxos.len(),
2163            input_sats = total_selected_value.to_sat(),
2164            peg_out_sats = peg_out_amount.to_sat(),
2165            ?total_weight,
2166            fees_sats = fees.to_sat(),
2167            fee_rate = fee_rate.sats_per_kvb,
2168            change_sats = change.to_sat(),
2169            "Creating peg-out tx",
2170        );
2171
2172        let transaction = Transaction {
2173            version: bitcoin::transaction::Version(2),
2174            lock_time: LockTime::ZERO,
2175            input: selected_utxos
2176                .iter()
2177                .map(|(utxo_key, _utxo)| TxIn {
2178                    previous_output: utxo_key.0,
2179                    script_sig: Default::default(),
2180                    sequence: Sequence::ENABLE_RBF_NO_LOCKTIME,
2181                    witness: bitcoin::Witness::new(),
2182                })
2183                .collect(),
2184            output,
2185        };
2186        info!(
2187            target: LOG_MODULE_WALLET,
2188            txid = %transaction.compute_txid(), "Creating peg-out tx"
2189        );
2190
2191        // FIXME: use custom data structure that guarantees more invariants and only
2192        // convert to PSBT for finalization
2193        let psbt = Psbt {
2194            unsigned_tx: transaction,
2195            version: 0,
2196            xpub: Default::default(),
2197            proprietary: Default::default(),
2198            unknown: Default::default(),
2199            inputs: selected_utxos
2200                .iter()
2201                .map(|(_utxo_key, utxo)| {
2202                    let script_pubkey = self
2203                        .descriptor
2204                        .tweak(&utxo.tweak, self.secp)
2205                        .script_pubkey();
2206                    Input {
2207                        non_witness_utxo: None,
2208                        witness_utxo: Some(TxOut {
2209                            value: utxo.amount,
2210                            script_pubkey,
2211                        }),
2212                        partial_sigs: Default::default(),
2213                        sighash_type: None,
2214                        redeem_script: None,
2215                        witness_script: Some(
2216                            self.descriptor
2217                                .tweak(&utxo.tweak, self.secp)
2218                                .script_code()
2219                                .expect("Failed to tweak descriptor"),
2220                        ),
2221                        bip32_derivation: Default::default(),
2222                        final_script_sig: None,
2223                        final_script_witness: None,
2224                        ripemd160_preimages: Default::default(),
2225                        sha256_preimages: Default::default(),
2226                        hash160_preimages: Default::default(),
2227                        hash256_preimages: Default::default(),
2228                        proprietary: vec![(proprietary_tweak_key(), utxo.tweak.to_vec())]
2229                            .into_iter()
2230                            .collect(),
2231                        tap_key_sig: Default::default(),
2232                        tap_script_sigs: Default::default(),
2233                        tap_scripts: Default::default(),
2234                        tap_key_origins: Default::default(),
2235                        tap_internal_key: Default::default(),
2236                        tap_merkle_root: Default::default(),
2237                        unknown: Default::default(),
2238                    }
2239                })
2240                .collect(),
2241            outputs: vec![Default::default(), change_out],
2242        };
2243
2244        Ok(UnsignedTransaction {
2245            psbt,
2246            signatures: vec![],
2247            change,
2248            fees: PegOutFees {
2249                fee_rate,
2250                total_weight,
2251            },
2252            destination,
2253            selected_utxos,
2254            peg_out_amount,
2255            rbf,
2256        })
2257    }
2258
2259    fn sign_psbt(&self, psbt: &mut Psbt) {
2260        let mut tx_hasher = SighashCache::new(&psbt.unsigned_tx);
2261
2262        for (idx, (psbt_input, _tx_input)) in psbt
2263            .inputs
2264            .iter_mut()
2265            .zip(psbt.unsigned_tx.input.iter())
2266            .enumerate()
2267        {
2268            let tweaked_secret = {
2269                let tweak = psbt_input
2270                    .proprietary
2271                    .get(&proprietary_tweak_key())
2272                    .expect("Malformed PSBT: expected tweak");
2273
2274                self.secret_key.tweak(tweak, self.secp)
2275            };
2276
2277            let tx_hash = tx_hasher
2278                .p2wsh_signature_hash(
2279                    idx,
2280                    psbt_input
2281                        .witness_script
2282                        .as_ref()
2283                        .expect("Missing witness script"),
2284                    psbt_input
2285                        .witness_utxo
2286                        .as_ref()
2287                        .expect("Missing UTXO")
2288                        .value,
2289                    EcdsaSighashType::All,
2290                )
2291                .expect("Failed to create segwit sighash");
2292
2293            let signature = self.secp.sign_ecdsa(
2294                &Message::from_digest_slice(&tx_hash[..]).unwrap(),
2295                &tweaked_secret,
2296            );
2297
2298            psbt_input.partial_sigs.insert(
2299                bitcoin::PublicKey {
2300                    compressed: true,
2301                    inner: secp256k1::PublicKey::from_secret_key(self.secp, &tweaked_secret),
2302                },
2303                EcdsaSig::sighash_all(signature),
2304            );
2305        }
2306    }
2307
2308    fn derive_script(&self, tweak: &[u8]) -> ScriptBuf {
2309        struct CompressedPublicKeyTranslator<'t, 's, Ctx: Verification> {
2310            tweak: &'t [u8],
2311            secp: &'s Secp256k1<Ctx>,
2312        }
2313
2314        impl<Ctx: Verification>
2315            miniscript::Translator<CompressedPublicKey, CompressedPublicKey, Infallible>
2316            for CompressedPublicKeyTranslator<'_, '_, Ctx>
2317        {
2318            fn pk(&mut self, pk: &CompressedPublicKey) -> Result<CompressedPublicKey, Infallible> {
2319                let hashed_tweak = {
2320                    let mut hasher = HmacEngine::<sha256::Hash>::new(&pk.key.serialize()[..]);
2321                    hasher.input(self.tweak);
2322                    Hmac::from_engine(hasher).to_byte_array()
2323                };
2324
2325                Ok(CompressedPublicKey {
2326                    key: pk
2327                        .key
2328                        .add_exp_tweak(
2329                            self.secp,
2330                            &Scalar::from_be_bytes(hashed_tweak).expect("can't fail"),
2331                        )
2332                        .expect("tweaking failed"),
2333                })
2334            }
2335            translate_hash_fail!(CompressedPublicKey, CompressedPublicKey, Infallible);
2336        }
2337
2338        let descriptor = self
2339            .descriptor
2340            .translate_pk(&mut CompressedPublicKeyTranslator {
2341                tweak,
2342                secp: self.secp,
2343            })
2344            .expect("can't fail");
2345
2346        descriptor.script_pubkey()
2347    }
2348}
2349
2350pub fn nonce_from_idx(nonce_idx: u64) -> [u8; 33] {
2351    let mut nonce: [u8; 33] = [0; 33];
2352    // Make it look like a compressed pubkey, has to be either 0x02 or 0x03
2353    nonce[0] = 0x02;
2354    nonce[1..].copy_from_slice(&nonce_idx.consensus_hash::<bitcoin::hashes::sha256::Hash>()[..]);
2355
2356    nonce
2357}
2358
2359/// A peg-out tx that is ready to be broadcast with a tweak for the change UTXO
2360#[derive(Clone, Debug, Encodable, Decodable)]
2361pub struct PendingTransaction {
2362    pub tx: bitcoin::Transaction,
2363    pub tweak: [u8; 33],
2364    pub change: bitcoin::Amount,
2365    pub destination: ScriptBuf,
2366    pub fees: PegOutFees,
2367    pub selected_utxos: Vec<(UTXOKey, SpendableUTXO)>,
2368    pub peg_out_amount: bitcoin::Amount,
2369    pub rbf: Option<Rbf>,
2370}
2371
2372impl Serialize for PendingTransaction {
2373    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2374    where
2375        S: serde::Serializer,
2376    {
2377        if serializer.is_human_readable() {
2378            serializer.serialize_str(&self.consensus_encode_to_hex())
2379        } else {
2380            serializer.serialize_bytes(&self.consensus_encode_to_vec())
2381        }
2382    }
2383}
2384
2385/// A PSBT that is awaiting enough signatures from the federation to becoming a
2386/// `PendingTransaction`
2387#[derive(Clone, Debug, Eq, PartialEq, Encodable, Decodable)]
2388pub struct UnsignedTransaction {
2389    pub psbt: Psbt,
2390    pub signatures: Vec<(PeerId, PegOutSignatureItem)>,
2391    pub change: bitcoin::Amount,
2392    pub fees: PegOutFees,
2393    pub destination: ScriptBuf,
2394    pub selected_utxos: Vec<(UTXOKey, SpendableUTXO)>,
2395    pub peg_out_amount: bitcoin::Amount,
2396    pub rbf: Option<Rbf>,
2397}
2398
2399impl Serialize for UnsignedTransaction {
2400    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
2401    where
2402        S: serde::Serializer,
2403    {
2404        if serializer.is_human_readable() {
2405            serializer.serialize_str(&self.consensus_encode_to_hex())
2406        } else {
2407            serializer.serialize_bytes(&self.consensus_encode_to_vec())
2408        }
2409    }
2410}
2411
2412#[cfg(test)]
2413mod tests {
2414
2415    use std::str::FromStr;
2416
2417    use bitcoin::Network::{Bitcoin, Testnet};
2418    use bitcoin::hashes::Hash;
2419    use bitcoin::{Address, Amount, OutPoint, Txid, secp256k1};
2420    use fedimint_core::Feerate;
2421    use fedimint_core::encoding::btc::NetworkLegacyEncodingWrapper;
2422    use fedimint_wallet_common::{PegOut, PegOutFees, Rbf, WalletOutputV0};
2423    use miniscript::descriptor::Wsh;
2424
2425    use crate::common::PegInDescriptor;
2426    use crate::{
2427        CompressedPublicKey, OsRng, SpendableUTXO, StatelessWallet, UTXOKey, WalletOutputError,
2428    };
2429
2430    #[test]
2431    fn create_tx_should_validate_amounts() {
2432        let secp = secp256k1::Secp256k1::new();
2433
2434        let descriptor = PegInDescriptor::Wsh(
2435            Wsh::new_sortedmulti(
2436                3,
2437                (0..4)
2438                    .map(|_| secp.generate_keypair(&mut OsRng))
2439                    .map(|(_, key)| CompressedPublicKey { key })
2440                    .collect(),
2441            )
2442            .unwrap(),
2443        );
2444
2445        let (secret_key, _) = secp.generate_keypair(&mut OsRng);
2446
2447        let wallet = StatelessWallet {
2448            descriptor: &descriptor,
2449            secret_key: &secret_key,
2450            secp: &secp,
2451        };
2452
2453        let spendable = SpendableUTXO {
2454            tweak: [0; 33],
2455            amount: bitcoin::Amount::from_sat(3000),
2456        };
2457
2458        let recipient = Address::from_str("32iVBEu4dxkUQk9dJbZUiBiQdmypcEyJRf").unwrap();
2459
2460        let fee = Feerate { sats_per_kvb: 1000 };
2461        let weight = 875;
2462
2463        // not enough SpendableUTXO
2464        // tx fee = ceil(875 / 4) * 1 sat/vb = 219
2465        // change script dust = 330
2466        // spendable sats = 3000 - 219 - 330 = 2451
2467        let tx = wallet.create_tx(
2468            Amount::from_sat(2452),
2469            recipient.clone().assume_checked().script_pubkey(),
2470            vec![],
2471            vec![(UTXOKey(OutPoint::null()), spendable.clone())],
2472            fee,
2473            &[0; 33],
2474            None,
2475        );
2476        assert_eq!(tx, Err(WalletOutputError::NotEnoughSpendableUTXO));
2477
2478        // successful tx creation
2479        let mut tx = wallet
2480            .create_tx(
2481                Amount::from_sat(1000),
2482                recipient.clone().assume_checked().script_pubkey(),
2483                vec![],
2484                vec![(UTXOKey(OutPoint::null()), spendable)],
2485                fee,
2486                &[0; 33],
2487                None,
2488            )
2489            .expect("is ok");
2490
2491        // peg out weight is incorrectly set to 0
2492        let res = StatelessWallet::validate_tx(&tx, &rbf(fee.sats_per_kvb, 0), fee, Bitcoin);
2493        assert_eq!(res, Err(WalletOutputError::TxWeightIncorrect(0, weight)));
2494
2495        // fee rate set below min relay fee to 0
2496        let res = StatelessWallet::validate_tx(&tx, &rbf(0, weight), fee, Bitcoin);
2497        assert_eq!(res, Err(WalletOutputError::BelowMinRelayFee));
2498
2499        // fees are okay
2500        let res = StatelessWallet::validate_tx(&tx, &rbf(fee.sats_per_kvb, weight), fee, Bitcoin);
2501        assert_eq!(res, Ok(()));
2502
2503        // tx has fee below consensus
2504        tx.fees = PegOutFees::new(0, weight);
2505        let res = StatelessWallet::validate_tx(&tx, &rbf(fee.sats_per_kvb, weight), fee, Bitcoin);
2506        assert_eq!(
2507            res,
2508            Err(WalletOutputError::PegOutFeeBelowConsensus(
2509                Feerate { sats_per_kvb: 0 },
2510                fee
2511            ))
2512        );
2513
2514        // tx has peg-out amount under dust limit
2515        tx.peg_out_amount = bitcoin::Amount::ZERO;
2516        let res = StatelessWallet::validate_tx(&tx, &rbf(fee.sats_per_kvb, weight), fee, Bitcoin);
2517        assert_eq!(res, Err(WalletOutputError::PegOutUnderDustLimit));
2518
2519        // tx is invalid for network
2520        let output = WalletOutputV0::PegOut(PegOut {
2521            recipient,
2522            amount: bitcoin::Amount::from_sat(1000),
2523            fees: PegOutFees::new(100, weight),
2524        });
2525        let res = StatelessWallet::validate_tx(&tx, &output, fee, Testnet);
2526        assert_eq!(
2527            res,
2528            Err(WalletOutputError::WrongNetwork(
2529                NetworkLegacyEncodingWrapper(Testnet),
2530                NetworkLegacyEncodingWrapper(Bitcoin)
2531            ))
2532        );
2533    }
2534
2535    fn rbf(sats_per_kvb: u64, total_weight: u64) -> WalletOutputV0 {
2536        WalletOutputV0::Rbf(Rbf {
2537            fees: PegOutFees::new(sats_per_kvb, total_weight),
2538            txid: Txid::all_zeros(),
2539        })
2540    }
2541}