#![deny(clippy::pedantic)]
#![allow(clippy::cast_possible_truncation)]
#![allow(clippy::cast_sign_loss)]
#![allow(clippy::missing_errors_doc)]
#![allow(clippy::missing_panics_doc)]
#![allow(clippy::module_name_repetitions)]
#![allow(clippy::similar_names)]
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, LazyLock, Mutex};
use std::time::Duration;
use std::{env, iter};
use anyhow::{Context, Result};
use bitcoin::{Block, BlockHash, Network, ScriptBuf, Transaction, Txid};
use fedimint_core::envs::{
is_running_in_test_env, BitcoinRpcConfig, FM_FORCE_BITCOIN_RPC_KIND_ENV,
FM_FORCE_BITCOIN_RPC_URL_ENV, FM_WALLET_FEERATE_SOURCES_ENV,
};
use fedimint_core::task::TaskGroup;
use fedimint_core::time::now;
use fedimint_core::txoproof::TxOutProof;
use fedimint_core::util::{get_median, FmtCompactAnyhow, SafeUrl};
use fedimint_core::{apply, async_trait_maybe_send, dyn_newtype_define, Feerate};
use fedimint_logging::{LOG_BITCOIND, LOG_CORE};
use feerate_source::{FeeRateSource, FetchJson};
use tokio::time::Interval;
use tracing::{debug, trace, warn};
#[cfg(feature = "bitcoincore-rpc")]
pub mod bitcoincore;
#[cfg(feature = "esplora-client")]
mod esplora;
mod feerate_source;
const MAINNET_GENESIS_BLOCK_HASH: &str =
"000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f";
const TESTNET_GENESIS_BLOCK_HASH: &str =
"000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943";
const SIGNET_GENESIS_BLOCK_HASH: &str =
"00000008819873e925422c1ff0f99f7cc9bbb232af63a077a480a3633bee1ef6";
const REGTEST_GENESIS_BLOCK_HASH: &str =
"0f9188f13cb7b2c71f2a335e3a4fc328bf5beb436012afca590b1a11466e2206";
static BITCOIN_RPC_REGISTRY: LazyLock<Mutex<BTreeMap<String, DynBitcoindRpcFactory>>> =
LazyLock::new(|| {
Mutex::new(BTreeMap::from([
#[cfg(feature = "esplora-client")]
("esplora".to_string(), esplora::EsploraFactory.into()),
#[cfg(feature = "bitcoincore-rpc")]
("bitcoind".to_string(), bitcoincore::BitcoindFactory.into()),
]))
});
pub fn create_bitcoind(config: &BitcoinRpcConfig) -> Result<DynBitcoindRpc> {
let registry = BITCOIN_RPC_REGISTRY.lock().expect("lock poisoned");
let kind = env::var(FM_FORCE_BITCOIN_RPC_KIND_ENV)
.ok()
.unwrap_or_else(|| config.kind.clone());
let url = env::var(FM_FORCE_BITCOIN_RPC_URL_ENV)
.ok()
.map(|s| SafeUrl::parse(&s))
.transpose()?
.unwrap_or_else(|| config.url.clone());
debug!(target: LOG_CORE, %kind, %url, "Starting bitcoin rpc");
let maybe_factory = registry.get(&kind);
let factory = maybe_factory.with_context(|| {
anyhow::anyhow!(
"{} rpc not registered, available options: {:?}",
config.kind,
registry.keys()
)
})?;
factory.create_connection(&url)
}
pub fn register_bitcoind(kind: String, factory: DynBitcoindRpcFactory) {
let mut registry = BITCOIN_RPC_REGISTRY.lock().expect("lock poisoned");
registry.insert(kind, factory);
}
pub trait IBitcoindRpcFactory: Debug + Send + Sync {
fn create_connection(&self, url: &SafeUrl) -> Result<DynBitcoindRpc>;
}
dyn_newtype_define! {
#[derive(Clone)]
pub DynBitcoindRpcFactory(Arc<IBitcoindRpcFactory>)
}
#[apply(async_trait_maybe_send!)]
pub trait IBitcoindRpc: Debug {
async fn get_network(&self) -> Result<bitcoin::Network>;
async fn get_block_count(&self) -> Result<u64>;
async fn get_block_hash(&self, height: u64) -> Result<BlockHash>;
async fn get_block(&self, block_hash: &BlockHash) -> Result<Block>;
async fn get_fee_rate(&self, confirmation_target: u16) -> Result<Option<Feerate>>;
async fn submit_transaction(&self, transaction: Transaction);
async fn get_tx_block_height(&self, txid: &Txid) -> Result<Option<u64>>;
async fn is_tx_in_block(
&self,
txid: &Txid,
block_hash: &BlockHash,
block_height: u64,
) -> Result<bool>;
async fn watch_script_history(&self, script: &ScriptBuf) -> Result<()>;
async fn get_script_history(&self, script: &ScriptBuf) -> Result<Vec<Transaction>>;
async fn get_txout_proof(&self, txid: Txid) -> Result<TxOutProof>;
async fn get_sync_percentage(&self) -> Result<Option<f64>>;
fn get_bitcoin_rpc_config(&self) -> BitcoinRpcConfig;
}
dyn_newtype_define! {
#[derive(Clone)]
pub DynBitcoindRpc(Arc<IBitcoindRpc>)
}
impl DynBitcoindRpc {
pub fn spawn_block_count_update_task(
self,
task_group: &TaskGroup,
on_update: impl Fn(u64) + Send + Sync + 'static,
) -> anyhow::Result<()> {
let mut desired_interval = get_bitcoin_polling_interval();
let last_block_count = AtomicU64::new(0);
task_group.spawn_cancellable("block count background task", {
async move {
trace!(target: LOG_BITCOIND, "Fetching block count from bitcoind");
let update_block_count = || async {
let res = self
.get_block_count()
.await;
match res {
Ok(block_count) => {
if last_block_count.load(Ordering::SeqCst) != block_count {
on_update(block_count);
last_block_count.store(block_count, Ordering::SeqCst);
}
},
Err(err) => {
warn!(target: LOG_BITCOIND, err = %err.fmt_compact_anyhow(), "Unable to get block count from the node");
}
}
};
loop {
let start = now();
update_block_count().await;
let duration = now().duration_since(start).unwrap_or_default();
if Duration::from_secs(10) < duration {
warn!(target: LOG_BITCOIND, duration_secs=duration.as_secs(), "Updating block count from bitcoind slow");
}
desired_interval.tick().await;
}
}
});
Ok(())
}
pub fn spawn_fee_rate_update_task(
self,
task_group: &TaskGroup,
network: Network,
confirmation_target: u16,
on_update: impl Fn(Feerate) + Send + Sync + 'static,
) -> anyhow::Result<()> {
let sources = std::env::var(FM_WALLET_FEERATE_SOURCES_ENV)
.unwrap_or_else(|_| match network {
Network::Bitcoin => "https://mempool.space/api/v1/fees/recommended#.;https://blockstream.info/api/fee-estimates#.\"1\"".to_owned(),
_ => String::new(),
})
.split(';')
.filter(|s| !s.is_empty())
.map(|s| Ok(Box::new(FetchJson::from_str(s)?) as Box<dyn FeeRateSource>))
.chain(iter::once(Ok(
Box::new(self.clone()) as Box<dyn FeeRateSource>
)))
.collect::<anyhow::Result<Vec<Box<dyn FeeRateSource>>>>()?;
let feerates = Arc::new(std::sync::Mutex::new(vec![None; sources.len()]));
let mut desired_interval = get_bitcoin_polling_interval();
task_group.spawn_cancellable("feerate background task", async move {
trace!(target: LOG_BITCOIND, "Fetching feerate from sources");
let last_feerate = AtomicU64::new(0);
let update_fee_rate = || async {
trace!(target: LOG_BITCOIND, "Updating bitcoin fee rate");
let feerates_new = futures::future::join_all(sources.iter().map(|s| async { (s.name(), s.fetch(confirmation_target).await) } )).await;
let mut feerates = feerates.lock().expect("lock poisoned");
for (i, (name, res)) in feerates_new.into_iter().enumerate() {
match res {
Ok(ok) => feerates[i] = Some(ok),
Err(err) => {
if !is_running_in_test_env() {
warn!(target: LOG_BITCOIND, err = %err.fmt_compact_anyhow(), %name, "Error getting feerate from source");
}
},
}
}
let mut available_feerates : Vec<_> = feerates.iter().filter_map(Clone::clone).map(|r| r.sats_per_kvb).collect();
available_feerates.sort_unstable();
if let Some(feerate) = get_median(&available_feerates) {
if feerate != last_feerate.load(Ordering::SeqCst) {
on_update(Feerate { sats_per_kvb: feerate });
last_feerate.store(feerate, Ordering::SeqCst);
}
} else {
if !is_running_in_test_env() {
warn!(target: LOG_BITCOIND, "Unable to calculate any fee rate");
}
}
};
loop {
let start = now();
update_fee_rate().await;
let duration = now().duration_since(start).unwrap_or_default();
if Duration::from_secs(10) < duration {
warn!(target: LOG_BITCOIND, duration_secs=duration.as_secs(), "Updating feerate from bitcoind slow");
}
desired_interval.tick().await;
}
});
Ok(())
}
}
fn get_bitcoin_polling_interval() -> Interval {
tokio::time::interval(if is_running_in_test_env() {
debug!(target: LOG_BITCOIND, "Running in devimint, using fast node polling");
Duration::from_millis(100)
} else {
Duration::from_secs(30)
})
}