use std::collections::{BTreeMap, HashSet};
use std::fmt::{self, Formatter};
use std::future::{pending, Future};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::{anyhow, bail, format_err, Context as _};
use async_stream::try_stream;
use bitcoin::key::rand::thread_rng;
use bitcoin::key::Secp256k1;
use bitcoin::secp256k1::{self, PublicKey};
use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
use fedimint_api_client::api::net::Connector;
use fedimint_api_client::api::{
ApiVersionSet, DynGlobalApi, FederationApiExt as _, IGlobalFederationApi,
};
use fedimint_client_module::module::recovery::RecoveryProgress;
use fedimint_client_module::module::{
ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
IClientModule, IdxRange, OutPointRange,
};
use fedimint_client_module::oplog::IOperationLog;
use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
use fedimint_client_module::transaction::{
TransactionBuilder, TxSubmissionStates, TxSubmissionStatesSM,
TRANSACTION_SUBMISSION_MODULE_INSTANCE,
};
use fedimint_client_module::{
AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
};
use fedimint_core::config::{
ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
};
use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
use fedimint_core::db::{
AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
};
use fedimint_core::encoding::{Decodable, Encodable};
use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
use fedimint_core::invite_code::InviteCode;
use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
use fedimint_core::module::{
ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
SupportedCoreApiVersions, SupportedModuleApiVersions,
};
use fedimint_core::net::api_announcement::SignedApiAnnouncement;
use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
use fedimint_core::transaction::Transaction;
use fedimint_core::util::{
backoff_util, retry, BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl,
};
use fedimint_core::{
apply, async_trait_maybe_send, fedimint_build_code_version_env, maybe_add_send,
maybe_add_send_sync, runtime, Amount, NumPeers, OutPoint, PeerId,
};
use fedimint_derive_secret::DerivableSecret;
use fedimint_eventlog::{
DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, PersistedLogEntry,
};
use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt as _};
use global_ctx::ModuleGlobalClientContext;
use tokio::sync::{broadcast, watch};
use tokio_stream::wrappers::WatchStream;
use tracing::{debug, info, warn};
use crate::api_announcements::{get_api_urls, ApiAnnouncementPrefix};
use crate::backup::Metadata;
use crate::db::{
apply_migrations_core_client, get_core_client_database_migrations, get_decoded_client_secret,
ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ClientConfigKey, ClientMetadataKey,
ClientModuleRecovery, ClientModuleRecoveryState, EncodedClientSecretKey, OperationLogKey,
PeerLastApiVersionsSummary, PeerLastApiVersionsSummaryKey,
};
use crate::meta::MetaService;
use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
use crate::oplog::OperationLog;
use crate::sm::executor::{
ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
};
use crate::ClientBuilder;
pub(crate) mod builder;
pub(crate) mod global_ctx;
pub(crate) mod handle;
const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
&[ApiVersion { major: 0, minor: 0 }];
pub struct Client {
final_client: FinalClientIface,
config: tokio::sync::RwLock<ClientConfig>,
api_secret: Option<String>,
decoders: ModuleDecoderRegistry,
db: Database,
federation_id: FederationId,
federation_config_meta: BTreeMap<String, String>,
primary_module_instance: ModuleInstanceId,
pub(crate) modules: ClientModuleRegistry,
module_inits: ClientModuleInitRegistry,
executor: Executor,
pub(crate) api: DynGlobalApi,
root_secret: DerivableSecret,
operation_log: OperationLog,
secp_ctx: Secp256k1<secp256k1::All>,
meta_service: Arc<MetaService>,
connector: Connector,
task_group: TaskGroup,
client_recovery_progress_receiver:
watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
log_ordering_wakeup_tx: watch::Sender<()>,
log_event_added_rx: watch::Receiver<()>,
log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
request_hook: ApiRequestHook,
}
impl Client {
pub async fn builder(db: Database) -> anyhow::Result<ClientBuilder> {
apply_migrations_core_client(
&db,
"fedimint-client".to_string(),
get_core_client_database_migrations(),
)
.await?;
Ok(ClientBuilder::new(db))
}
pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
self.api.as_ref()
}
pub fn api_clone(&self) -> DynGlobalApi {
self.api.clone()
}
pub fn task_group(&self) -> &TaskGroup {
&self.task_group
}
#[doc(hidden)]
pub fn executor(&self) -> &Executor {
&self.executor
}
pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
let mut dbtx = db.begin_transaction_nc().await;
dbtx.get_value(&ClientConfigKey).await
}
pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
let mut dbtx = db.begin_transaction_nc().await;
dbtx.get_value(&ApiSecretKey).await
}
pub async fn store_encodable_client_secret<T: Encodable>(
db: &Database,
secret: T,
) -> anyhow::Result<()> {
let mut dbtx = db.begin_transaction().await;
if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
bail!("Encoded client secret already exists, cannot overwrite")
}
let encoded_secret = T::consensus_encode_to_vec(&secret);
dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
.await;
dbtx.commit_tx().await;
Ok(())
}
pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
bail!("Encoded client secret not present in DB")
};
Ok(secret)
}
pub async fn load_decodable_client_secret_opt<T: Decodable>(
db: &Database,
) -> anyhow::Result<Option<T>> {
let mut dbtx = db.begin_transaction_nc().await;
let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
Ok(match client_secret {
Some(client_secret) => Some(
T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
.map_err(|e| anyhow!("Decoding failed: {e}"))?,
),
None => None,
})
}
pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
let client_secret =
if let Ok(secret) = Self::load_decodable_client_secret::<[u8; 64]>(db).await {
secret
} else {
let secret = PlainRootSecretStrategy::random(&mut thread_rng());
Self::store_encodable_client_secret(db, secret)
.await
.expect("Storing client secret must work");
secret
};
Ok(client_secret)
}
pub async fn is_initialized(db: &Database) -> bool {
Self::get_config_from_db(db).await.is_some()
}
pub fn start_executor(self: &Arc<Self>) {
debug!(
target: LOG_CLIENT,
"Starting fedimint client executor (version: {})",
fedimint_build_code_version_env!()
);
self.executor.start_executor(self.context_gen());
}
pub fn federation_id(&self) -> FederationId {
self.federation_id
}
fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
let client_inner = Arc::downgrade(self);
Arc::new(move |module_instance, operation| {
ModuleGlobalClientContext {
client: client_inner
.clone()
.upgrade()
.expect("ModuleGlobalContextGen called after client was dropped"),
module_instance_id: module_instance,
operation,
}
.into()
})
}
pub async fn config(&self) -> ClientConfig {
self.config.read().await.clone()
}
pub fn api_secret(&self) -> &Option<String> {
&self.api_secret
}
pub fn decoders(&self) -> &ModuleDecoderRegistry {
&self.decoders
}
fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
self.try_get_module(instance)
.expect("Module instance not found")
}
fn try_get_module(
&self,
instance: ModuleInstanceId,
) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
Some(self.modules.get(instance)?.as_ref())
}
pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
self.modules.get(instance).is_some()
}
fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
let mut in_amount = Amount::ZERO;
let mut out_amount = Amount::ZERO;
let mut fee_amount = Amount::ZERO;
for input in builder.inputs() {
let module = self.get_module(input.input.module_instance_id());
let item_fee = module.input_fee(input.amount, &input.input).expect(
"We only build transactions with input versions that are supported by the module",
);
in_amount += input.amount;
fee_amount += item_fee;
}
for output in builder.outputs() {
let module = self.get_module(output.output.module_instance_id());
let item_fee = module.output_fee(output.amount, &output.output).expect(
"We only build transactions with output versions that are supported by the module",
);
out_amount += output.amount;
fee_amount += item_fee;
}
(in_amount, out_amount + fee_amount)
}
pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
}
pub fn get_config_meta(&self, key: &str) -> Option<String> {
self.federation_config_meta.get(key).cloned()
}
pub(crate) fn root_secret(&self) -> DerivableSecret {
self.root_secret.clone()
}
pub async fn add_state_machines(
&self,
dbtx: &mut DatabaseTransaction<'_>,
states: Vec<DynState>,
) -> AddStateMachinesResult {
self.executor.add_state_machines_dbtx(dbtx, states).await
}
pub async fn get_active_operations(&self) -> HashSet<OperationId> {
let active_states = self.executor.get_active_states().await;
let mut active_operations = HashSet::with_capacity(active_states.len());
let mut dbtx = self.db().begin_transaction_nc().await;
for (state, _) in active_states {
let operation_id = state.operation_id();
if dbtx
.get_value(&OperationLogKey { operation_id })
.await
.is_some()
{
active_operations.insert(operation_id);
}
}
active_operations
}
pub fn operation_log(&self) -> &OperationLog {
&self.operation_log
}
pub fn meta_service(&self) -> &Arc<MetaService> {
&self.meta_service
}
pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
let meta_service = self.meta_service();
let ts = meta_service
.get_field::<u64>(self.db(), "federation_expiry_timestamp")
.await
.and_then(|v| v.value)?;
Some(UNIX_EPOCH + Duration::from_secs(ts))
}
async fn finalize_transaction(
&self,
dbtx: &mut DatabaseTransaction<'_>,
operation_id: OperationId,
mut partial_transaction: TransactionBuilder,
) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
let (added_input_bundle, change_outputs) = self
.primary_module()
.create_final_inputs_and_outputs(
self.primary_module_instance,
dbtx,
operation_id,
input_amount,
output_amount,
)
.await?;
let change_range = Range {
start: partial_transaction.outputs().count() as u64,
end: (partial_transaction.outputs().count() + change_outputs.outputs().len()) as u64,
};
partial_transaction = partial_transaction
.with_inputs(added_input_bundle)
.with_outputs(change_outputs);
let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
assert!(input_amount >= output_amount, "Transaction is underfunded");
let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
Ok((tx, states, change_range))
}
pub async fn finalize_and_submit_transaction<F, M>(
&self,
operation_id: OperationId,
operation_type: &str,
operation_meta_gen: F,
tx_builder: TransactionBuilder,
) -> anyhow::Result<OutPointRange>
where
F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
M: serde::Serialize + MaybeSend,
{
let operation_type = operation_type.to_owned();
let autocommit_res = self
.db
.autocommit(
|dbtx, _| {
let operation_type = operation_type.clone();
let tx_builder = tx_builder.clone();
let operation_meta_gen = operation_meta_gen.clone();
Box::pin(async move {
if Client::operation_exists_dbtx(dbtx, operation_id).await {
bail!("There already exists an operation with id {operation_id:?}")
}
let out_point_range = self
.finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
.await?;
self.operation_log()
.add_operation_log_entry_dbtx(
dbtx,
operation_id,
&operation_type,
operation_meta_gen(out_point_range),
)
.await;
Ok(out_point_range)
})
},
Some(100), )
.await;
match autocommit_res {
Ok(txid) => Ok(txid),
Err(AutocommitError::ClosureError { error, .. }) => Err(error),
Err(AutocommitError::CommitFailed {
attempts,
last_error,
}) => panic!(
"Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
),
}
}
async fn finalize_and_submit_transaction_inner(
&self,
dbtx: &mut DatabaseTransaction<'_>,
operation_id: OperationId,
tx_builder: TransactionBuilder,
) -> anyhow::Result<OutPointRange> {
let (transaction, mut states, change_range) = self
.finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
.await?;
if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
let inputs = transaction
.inputs
.iter()
.map(DynInput::module_instance_id)
.collect::<Vec<_>>();
let outputs = transaction
.outputs
.iter()
.map(DynOutput::module_instance_id)
.collect::<Vec<_>>();
warn!(
target: LOG_CLIENT_NET_API,
size=%transaction.consensus_encode_to_vec().len(),
?inputs,
?outputs,
"Transaction too large",
);
debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
bail!(
"The generated transaction would be rejected by the federation for being too large."
);
}
let txid = transaction.tx_hash();
debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
let tx_submission_sm = DynState::from_typed(
TRANSACTION_SUBMISSION_MODULE_INSTANCE,
TxSubmissionStatesSM {
operation_id,
state: TxSubmissionStates::Created(transaction),
},
);
states.push(tx_submission_sm);
self.executor.add_state_machines_dbtx(dbtx, states).await?;
self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
.await;
Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
}
async fn transaction_update_stream(
&self,
operation_id: OperationId,
) -> BoxStream<'static, TxSubmissionStatesSM> {
self.executor
.notifier()
.module_notifier::<TxSubmissionStatesSM>(
TRANSACTION_SUBMISSION_MODULE_INSTANCE,
self.final_client.clone(),
)
.subscribe(operation_id)
.await
}
pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
let mut dbtx = self.db().begin_transaction_nc().await;
Client::operation_exists_dbtx(&mut dbtx, operation_id).await
}
pub async fn operation_exists_dbtx(
dbtx: &mut DatabaseTransaction<'_>,
operation_id: OperationId,
) -> bool {
let active_state_exists = dbtx
.find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
.await
.next()
.await
.is_some();
let inactive_state_exists = dbtx
.find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
.await
.next()
.await
.is_some();
active_state_exists || inactive_state_exists
}
pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
self.db
.begin_transaction_nc()
.await
.find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
.await
.next()
.await
.is_some()
}
pub async fn await_primary_module_output(
&self,
operation_id: OperationId,
out_point: OutPoint,
) -> anyhow::Result<()> {
self.primary_module()
.await_primary_module_output(operation_id, out_point)
.await
}
pub fn get_first_module<M: ClientModule>(&self) -> anyhow::Result<ClientModuleInstance<M>> {
let module_kind = M::kind();
let id = self
.get_first_instance(&module_kind)
.ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
let module: &M = self
.try_get_module(id)
.ok_or_else(|| format_err!("Unknown module instance {id}"))?
.as_any()
.downcast_ref::<M>()
.ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
let (db, _) = self.db().with_prefix_module_id(id);
Ok(ClientModuleInstance {
id,
db,
api: self.api().with_module(id),
module,
})
}
pub fn get_module_client_dyn(
&self,
instance_id: ModuleInstanceId,
) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
self.try_get_module(instance_id)
.ok_or(anyhow!("Unknown module instance {}", instance_id))
}
pub fn db(&self) -> &Database {
&self.db
}
pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
TransactionUpdates {
update_stream: self.transaction_update_stream(operation_id).await,
}
}
pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
if self
.modules
.get_with_kind(self.primary_module_instance)
.is_some_and(|(kind, _)| kind == module_kind)
{
return Some(self.primary_module_instance);
}
self.modules
.iter_modules()
.find(|(_, kind, _module)| *kind == module_kind)
.map(|(instance_id, _, _)| instance_id)
}
pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
get_decoded_client_secret::<T>(self.db()).await
}
pub async fn await_primary_module_outputs(
&self,
operation_id: OperationId,
outputs: Vec<OutPoint>,
) -> anyhow::Result<()> {
for out_point in outputs {
self.await_primary_module_output(operation_id, out_point)
.await?;
}
Ok(())
}
pub async fn get_config_json(&self) -> JsonClientConfig {
self.config().await.to_json()
}
pub fn primary_module(&self) -> &DynClientModule {
self.modules
.get(self.primary_module_instance)
.expect("primary module must be present")
}
pub async fn get_balance(&self) -> Amount {
self.primary_module()
.get_balance(
self.primary_module_instance,
&mut self.db().begin_transaction_nc().await,
)
.await
}
pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
let mut balance_changes = self.primary_module().subscribe_balance_changes().await;
let initial_balance = self.get_balance().await;
let db = self.db().clone();
let primary_module = self.primary_module().clone();
let primary_module_instance = self.primary_module_instance;
Box::pin(async_stream::stream! {
yield initial_balance;
let mut prev_balance = initial_balance;
while let Some(()) = balance_changes.next().await {
let mut dbtx = db.begin_transaction_nc().await;
let balance = primary_module
.get_balance(primary_module_instance, &mut dbtx)
.await;
if balance != prev_balance {
prev_balance = balance;
yield balance;
}
}
})
}
pub async fn refresh_peers_api_versions(
num_peers: NumPeers,
api: DynGlobalApi,
db: Database,
num_responses_sender: watch::Sender<usize>,
) {
async fn make_request(
delay: Duration,
peer_id: PeerId,
api: &DynGlobalApi,
) -> (
PeerId,
Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
) {
runtime::sleep(delay).await;
(
peer_id,
api.request_single_peer::<SupportedApiVersionsSummary>(
VERSION_ENDPOINT.to_owned(),
ApiRequestErased::default(),
peer_id,
)
.await,
)
}
let mut requests = FuturesUnordered::new();
for peer_id in num_peers.peer_ids() {
requests.push(make_request(Duration::ZERO, peer_id, &api));
}
let mut num_responses = 0;
while let Some((peer_id, response)) = requests.next().await {
match response {
Err(err) => {
if db
.begin_transaction_nc()
.await
.get_value(&PeerLastApiVersionsSummaryKey(peer_id))
.await
.is_some()
{
debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, but we have a previous response");
} else {
debug!(target: LOG_CLIENT, %peer_id, err = %err.fmt_compact(), "Failed to refresh API versions of a peer, will retry");
requests.push(make_request(Duration::from_secs(15), peer_id, &api));
}
}
Ok(o) => {
let mut dbtx = db.begin_transaction().await;
dbtx.insert_entry(
&PeerLastApiVersionsSummaryKey(peer_id),
&PeerLastApiVersionsSummary(o),
)
.await;
dbtx.commit_tx().await;
num_responses += 1;
let _ = num_responses_sender.send(num_responses);
}
}
}
}
pub fn supported_api_versions_summary_static(
config: &ClientConfig,
client_module_init: &ClientModuleInitRegistry,
) -> SupportedApiVersionsSummary {
SupportedApiVersionsSummary {
core: SupportedCoreApiVersions {
core_consensus: config.global.consensus_version,
api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
.expect("must not have conflicting versions"),
},
modules: config
.modules
.iter()
.filter_map(|(&module_instance_id, module_config)| {
client_module_init
.get(module_config.kind())
.map(|module_init| {
(
module_instance_id,
SupportedModuleApiVersions {
core_consensus: config.global.consensus_version,
module_consensus: module_config.version,
api: module_init.supported_api_versions(),
},
)
})
})
.collect(),
}
}
pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
Self::load_and_refresh_common_api_version_static(
&self.config().await,
&self.module_inits,
&self.api,
&self.db,
&self.task_group,
)
.await
}
async fn load_and_refresh_common_api_version_static(
config: &ClientConfig,
module_init: &ClientModuleInitRegistry,
api: &DynGlobalApi,
db: &Database,
task_group: &TaskGroup,
) -> anyhow::Result<ApiVersionSet> {
if let Some(v) = db
.begin_transaction_nc()
.await
.get_value(&CachedApiVersionSetKey)
.await
{
debug!(
target: LOG_CLIENT,
"Found existing cached common api versions"
);
let config = config.clone();
let client_module_init = module_init.clone();
let api = api.clone();
let db = db.clone();
let task_group = task_group.clone();
task_group
.clone()
.spawn_cancellable("refresh_common_api_version_static", async move {
if let Err(error) = Self::refresh_common_api_version_static(
&config,
&client_module_init,
&api,
&db,
task_group,
)
.await
{
warn!(
target: LOG_CLIENT,
err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
);
}
});
return Ok(v.0);
}
debug!(
target: LOG_CLIENT,
"No existing cached common api versions found, waiting for initial discovery"
);
Self::refresh_common_api_version_static(config, module_init, api, db, task_group.clone())
.await
}
async fn refresh_common_api_version_static(
config: &ClientConfig,
client_module_init: &ClientModuleInitRegistry,
api: &DynGlobalApi,
db: &Database,
task_group: TaskGroup,
) -> anyhow::Result<ApiVersionSet> {
debug!(
target: LOG_CLIENT,
"Refreshing common api versions"
);
let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
let num_peers = NumPeers::from(config.global.api_endpoints.len());
task_group.spawn_cancellable("refresh peers api versions", {
Client::refresh_peers_api_versions(
num_peers,
api.clone(),
db.clone(),
num_responses_sender,
)
});
let _: Result<_, Elapsed> = runtime::timeout(
Duration::from_secs(15),
num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
)
.await;
let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
let common_api_versions =
fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
&Self::supported_api_versions_summary_static(config, client_module_init),
&peer_api_version_sets,
)?;
debug!(
target: LOG_CLIENT,
value = ?common_api_versions,
"Updating the cached common api versions"
);
let mut dbtx = db.begin_transaction().await;
let _ = dbtx
.insert_entry(
&CachedApiVersionSetKey,
&CachedApiVersionSet(common_api_versions.clone()),
)
.await;
dbtx.commit_tx().await;
Ok(common_api_versions)
}
pub async fn get_metadata(&self) -> Metadata {
self.db
.begin_transaction_nc()
.await
.get_value(&ClientMetadataKey)
.await
.unwrap_or_else(|| {
warn!(
target: LOG_CLIENT,
"Missing existing metadata. This key should have been set on Client init"
);
Metadata::empty()
})
}
pub async fn set_metadata(&self, metadata: &Metadata) {
self.db
.autocommit::<_, _, anyhow::Error>(
|dbtx, _| {
Box::pin(async {
Self::set_metadata_dbtx(dbtx, metadata).await;
Ok(())
})
},
None,
)
.await
.expect("Failed to autocommit metadata");
}
pub fn has_pending_recoveries(&self) -> bool {
!self
.client_recovery_progress_receiver
.borrow()
.iter()
.all(|(_id, progress)| progress.is_done())
}
pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
recovery_receiver
.wait_for(|in_progress| {
in_progress
.iter()
.all(|(_id, progress)| progress.is_done())
})
.await
.context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
Ok(())
}
pub fn subscribe_to_recovery_progress(
&self,
) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> {
WatchStream::new(self.client_recovery_progress_receiver.clone())
.flat_map(futures::stream::iter)
}
pub async fn wait_for_module_kind_recovery(
&self,
module_kind: ModuleKind,
) -> anyhow::Result<()> {
let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
let config = self.config().await;
recovery_receiver
.wait_for(|in_progress| {
!in_progress
.iter()
.filter(|(module_instance_id, _progress)| {
config.modules[module_instance_id].kind == module_kind
})
.any(|(_id, progress)| !progress.is_done())
})
.await
.context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
Ok(())
}
pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
loop {
if self.executor.get_active_states().await.is_empty() {
break;
}
fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
}
Ok(())
}
pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
}
fn spawn_module_recoveries_task(
&self,
recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
module_recoveries: BTreeMap<
ModuleInstanceId,
Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
>,
module_recovery_progress_receivers: BTreeMap<
ModuleInstanceId,
watch::Receiver<RecoveryProgress>,
>,
) {
let db = self.db.clone();
let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
self.task_group
.spawn("module recoveries", |_task_handle| async {
Self::run_module_recoveries_task(
db,
log_ordering_wakeup_tx,
recovery_sender,
module_recoveries,
module_recovery_progress_receivers,
)
.await;
});
}
async fn run_module_recoveries_task(
db: Database,
log_ordering_wakeup_tx: watch::Sender<()>,
recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
module_recoveries: BTreeMap<
ModuleInstanceId,
Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
>,
module_recovery_progress_receivers: BTreeMap<
ModuleInstanceId,
watch::Receiver<RecoveryProgress>,
>,
) {
debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
let mut completed_stream = Vec::new();
let progress_stream = futures::stream::FuturesUnordered::new();
for (module_instance_id, f) in module_recoveries {
completed_stream.push(futures::stream::once(Box::pin(async move {
match f.await {
Ok(()) => (module_instance_id, None),
Err(err) => {
warn!(
target: LOG_CLIENT,
err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
);
futures::future::pending::<()>().await;
unreachable!()
}
}
})));
}
for (module_instance_id, rx) in module_recovery_progress_receivers {
progress_stream.push(
tokio_stream::wrappers::WatchStream::new(rx)
.fuse()
.map(move |progress| (module_instance_id, Some(progress))),
);
}
let mut futures = futures::stream::select(
futures::stream::select_all(progress_stream),
futures::stream::select_all(completed_stream),
);
while let Some((module_instance_id, progress)) = futures.next().await {
let mut dbtx = db.begin_transaction().await;
let prev_progress = *recovery_sender
.borrow()
.get(&module_instance_id)
.expect("existing progress must be present");
let progress = if prev_progress.is_done() {
prev_progress
} else if let Some(progress) = progress {
progress
} else {
prev_progress.to_complete()
};
if !prev_progress.is_done() && progress.is_done() {
info!(
target: LOG_CLIENT,
module_instance_id,
progress = format!("{}/{}", progress.complete, progress.total),
"Recovery complete"
);
dbtx.log_event(
log_ordering_wakeup_tx.clone(),
None,
ModuleRecoveryCompleted {
module_id: module_instance_id,
},
)
.await;
} else {
info!(
target: LOG_CLIENT,
module_instance_id,
progress = format!("{}/{}", progress.complete, progress.total),
"Recovery progress"
);
}
dbtx.insert_entry(
&ClientModuleRecovery { module_instance_id },
&ClientModuleRecoveryState { progress },
)
.await;
dbtx.commit_tx().await;
recovery_sender.send_modify(|v| {
v.insert(module_instance_id, progress);
});
}
debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
}
async fn load_peers_last_api_versions(
db: &Database,
num_peers: NumPeers,
) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
let mut peer_api_version_sets = BTreeMap::new();
let mut dbtx = db.begin_transaction_nc().await;
for peer_id in num_peers.peer_ids() {
if let Some(v) = dbtx
.get_value(&PeerLastApiVersionsSummaryKey(peer_id))
.await
{
peer_api_version_sets.insert(peer_id, v.0);
}
}
drop(dbtx);
peer_api_version_sets
}
pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
self.db()
.begin_transaction_nc()
.await
.find_by_prefix(&ApiAnnouncementPrefix)
.await
.map(|(announcement_key, announcement)| (announcement_key.0, announcement))
.collect()
.await
}
pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
get_api_urls(&self.db, &self.config().await).await
}
pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
self.get_peer_urls()
.await
.into_iter()
.find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
.map(|peer_url| {
InviteCode::new(
peer_url.clone(),
peer,
self.federation_id(),
self.api_secret.clone(),
)
})
}
pub async fn get_guardian_public_keys_blocking(
&self,
) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
self.db.autocommit(|dbtx, _| Box::pin(async move {
let config = self.config().await;
let guardian_pub_keys = if let Some(guardian_pub_keys) = config.global.broadcast_public_keys {guardian_pub_keys}else{
let fetched_config = retry(
"Fetching guardian public keys",
backoff_util::background_backoff(),
|| async {
Ok(self.api.request_current_consensus::<ClientConfig>(
CLIENT_CONFIG_ENDPOINT.to_owned(),
ApiRequestErased::default(),
).await?)
},
)
.await
.expect("Will never return on error");
let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
warn!(
target: LOG_CLIENT,
"Guardian public keys not found in fetched config, server not updated to 0.4 yet"
);
pending::<()>().await;
unreachable!("Pending will never return");
};
let new_config = ClientConfig {
global: GlobalClientConfig {
broadcast_public_keys: Some(guardian_pub_keys.clone()),
..config.global
},
modules: config.modules,
};
dbtx.insert_entry(&ClientConfigKey, &new_config).await;
*(self.config.write().await) = new_config;
guardian_pub_keys
};
Result::<_, ()>::Ok(guardian_pub_keys)
}), None).await.expect("Will retry forever")
}
pub fn handle_global_rpc(
&self,
method: String,
params: serde_json::Value,
) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
Box::pin(try_stream! {
match method.as_str() {
"get_balance" => {
let balance = self.get_balance().await;
yield serde_json::to_value(balance)?;
}
"subscribe_balance_changes" => {
let mut stream = self.subscribe_balance_changes().await;
while let Some(balance) = stream.next().await {
yield serde_json::to_value(balance)?;
}
}
"get_config" => {
let config = self.config().await;
yield serde_json::to_value(config)?;
}
"get_federation_id" => {
let federation_id = self.federation_id();
yield serde_json::to_value(federation_id)?;
}
"get_invite_code" => {
let req: GetInviteCodeRequest = serde_json::from_value(params)?;
let invite_code = self.invite_code(req.peer).await;
yield serde_json::to_value(invite_code)?;
}
"list_operations" => {
let operations = self.operation_log().paginate_operations_rev(usize::MAX, None).await;
yield serde_json::to_value(operations)?;
}
"has_pending_recoveries" => {
let has_pending = self.has_pending_recoveries();
yield serde_json::to_value(has_pending)?;
}
"wait_for_all_recoveries" => {
self.wait_for_all_recoveries().await?;
yield serde_json::Value::Null;
}
"subscribe_to_recovery_progress" => {
let mut stream = self.subscribe_to_recovery_progress();
while let Some((module_id, progress)) = stream.next().await {
yield serde_json::json!({
"module_id": module_id,
"progress": progress
});
}
}
_ => {
Err(anyhow::format_err!("Unknown method: {}", method))?;
unreachable!()
},
}
})
}
pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
where
E: Event + Send,
{
let mut dbtx = self.db.begin_transaction().await;
self.log_event_dbtx(&mut dbtx, module_id, event).await;
dbtx.commit_tx().await;
}
pub async fn log_event_dbtx<E, Cap>(
&self,
dbtx: &mut DatabaseTransaction<'_, Cap>,
module_id: Option<ModuleInstanceId>,
event: E,
) where
E: Event + Send,
Cap: Send,
{
dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
.await;
}
pub async fn log_event_raw_dbtx<Cap>(
&self,
dbtx: &mut DatabaseTransaction<'_, Cap>,
kind: EventKind,
module: Option<(ModuleKind, ModuleInstanceId)>,
payload: Vec<u8>,
persist: bool,
) where
Cap: Send,
{
let module_id = module.as_ref().map(|m| m.1);
let module_kind = module.map(|m| m.0);
dbtx.log_event_raw(
self.log_ordering_wakeup_tx.clone(),
kind,
module_kind,
module_id,
payload,
persist,
)
.await;
}
pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
where
K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
K: DatabaseRecord<Value = EventLogId>,
F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
R: Future<Output = anyhow::Result<()>>,
{
fedimint_eventlog::handle_events(
self.db.clone(),
pos_key,
self.log_event_added_rx.clone(),
call_fn,
)
.await
}
pub async fn get_event_log(
&self,
pos: Option<EventLogId>,
limit: u64,
) -> Vec<PersistedLogEntry> {
self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
.await
}
pub async fn get_event_log_dbtx<Cap>(
&self,
dbtx: &mut DatabaseTransaction<'_, Cap>,
pos: Option<EventLogId>,
limit: u64,
) -> Vec<PersistedLogEntry>
where
Cap: Send,
{
dbtx.get_event_log(pos, limit).await
}
pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
self.log_event_added_transient_tx.subscribe()
}
}
#[apply(async_trait_maybe_send!)]
impl ClientContextIface for Client {
fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
Client::get_module(self, instance)
}
fn api_clone(&self) -> DynGlobalApi {
Client::api_clone(self)
}
fn decoders(&self) -> &ModuleDecoderRegistry {
Client::decoders(self)
}
async fn finalize_and_submit_transaction(
&self,
operation_id: OperationId,
operation_type: &str,
operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
tx_builder: TransactionBuilder,
) -> anyhow::Result<OutPointRange> {
Client::finalize_and_submit_transaction(
self,
operation_id,
operation_type,
&operation_meta_gen,
tx_builder,
)
.await
}
async fn finalize_and_submit_transaction_inner(
&self,
dbtx: &mut DatabaseTransaction<'_>,
operation_id: OperationId,
tx_builder: TransactionBuilder,
) -> anyhow::Result<OutPointRange> {
Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
}
async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
Client::transaction_updates(self, operation_id).await
}
async fn await_primary_module_outputs(
&self,
operation_id: OperationId,
outputs: Vec<OutPoint>,
) -> anyhow::Result<()> {
Client::await_primary_module_outputs(self, operation_id, outputs).await
}
fn operation_log(&self) -> &dyn IOperationLog {
Client::operation_log(self)
}
async fn has_active_states(&self, operation_id: OperationId) -> bool {
Client::has_active_states(self, operation_id).await
}
async fn operation_exists(&self, operation_id: OperationId) -> bool {
Client::operation_exists(self, operation_id).await
}
async fn config(&self) -> ClientConfig {
Client::config(self).await
}
fn db(&self) -> &Database {
Client::db(self)
}
fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
Client::executor(self)
}
async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
Client::invite_code(self, peer).await
}
fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
Client::get_internal_payment_markers(self)
}
async fn log_event_json(
&self,
dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
module_kind: Option<ModuleKind>,
module_id: ModuleInstanceId,
kind: EventKind,
payload: serde_json::Value,
persist: bool,
) {
dbtx.ensure_global()
.expect("Must be called with global dbtx");
self.log_event_raw_dbtx(
dbtx,
kind,
module_kind.map(|kind| (kind, module_id)),
serde_json::to_vec(&payload).expect("Serialization can't fail"),
persist,
)
.await;
}
async fn read_operation_active_states<'dbtx>(
&self,
operation_id: OperationId,
module_id: ModuleInstanceId,
dbtx: &'dbtx mut DatabaseTransaction<'_>,
) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
{
Box::pin(
dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
operation_id,
module_instance: module_id,
})
.await
.map(move |(k, v)| (k.0, v)),
)
}
async fn read_operation_inactive_states<'dbtx>(
&self,
operation_id: OperationId,
module_id: ModuleInstanceId,
dbtx: &'dbtx mut DatabaseTransaction<'_>,
) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
{
Box::pin(
dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
operation_id,
module_instance: module_id,
})
.await
.map(move |(k, v)| (k.0, v)),
)
}
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Client")
}
}
pub fn client_decoders<'a>(
registry: &ModuleInitRegistry<DynClientModuleInit>,
module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
) -> ModuleDecoderRegistry {
let mut modules = BTreeMap::new();
for (id, kind) in module_kinds {
let Some(init) = registry.get(kind) else {
debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
continue;
};
modules.insert(
id,
(
kind.clone(),
IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
),
);
}
ModuleDecoderRegistry::from(modules)
}