1#![deny(clippy::pedantic)]
2#![allow(clippy::cast_possible_truncation)]
3#![allow(clippy::doc_markdown)]
4#![allow(clippy::explicit_deref_methods)]
5#![allow(clippy::missing_errors_doc)]
6#![allow(clippy::missing_panics_doc)]
7#![allow(clippy::module_name_repetitions)]
8#![allow(clippy::must_use_candidate)]
9#![allow(clippy::needless_lifetimes)]
10#![allow(clippy::return_self_not_must_use)]
11#![allow(clippy::too_many_lines)]
12#![allow(clippy::type_complexity)]
13
14use std::fmt::Debug;
15use std::ops::{self};
16use std::sync::Arc;
17
18use fedimint_api_client::api::{DynGlobalApi, DynModuleApi};
19use fedimint_core::config::ClientConfig;
20pub use fedimint_core::core::{IInput, IOutput, ModuleInstanceId, ModuleKind, OperationId};
21use fedimint_core::db::Database;
22use fedimint_core::module::registry::ModuleDecoderRegistry;
23use fedimint_core::module::{ApiAuth, ApiVersion};
24use fedimint_core::task::{MaybeSend, MaybeSync};
25use fedimint_core::util::{BoxStream, NextOrPending};
26use fedimint_core::{
27 PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, maybe_add_send_sync,
28};
29use fedimint_eventlog::{Event, EventKind, EventPersistence};
30use fedimint_logging::LOG_CLIENT;
31use futures::StreamExt;
32use module::OutPointRange;
33use serde::{Deserialize, Serialize};
34use thiserror::Error;
35use tracing::debug;
36use transaction::{
37 ClientInputBundle, ClientInputSM, ClientOutput, ClientOutputSM, TxSubmissionStatesSM,
38};
39
40pub use crate::module::{ClientModule, StateGenerator};
41use crate::sm::executor::ContextGen;
42use crate::sm::{ClientSMDatabaseTransaction, DynState, IState, State};
43use crate::transaction::{ClientInput, ClientOutputBundle, TxSubmissionStates};
44
45pub mod api;
46
47pub mod db;
48
49pub mod backup;
50pub mod envs;
52pub mod meta;
53pub mod module;
55pub mod oplog;
57pub mod secret;
59pub mod sm;
61pub mod transaction;
63
64pub mod api_version_discovery;
65
66#[derive(Serialize, Deserialize)]
67pub struct TxCreatedEvent {
68 pub txid: TransactionId,
69 pub operation_id: OperationId,
70}
71
72impl Event for TxCreatedEvent {
73 const MODULE: Option<ModuleKind> = None;
74 const KIND: EventKind = EventKind::from_static("tx-created");
75 const PERSISTENCE: EventPersistence = EventPersistence::Persistent;
76}
77
78#[derive(Serialize, Deserialize)]
79pub struct TxAcceptedEvent {
80 txid: TransactionId,
81 operation_id: OperationId,
82}
83
84impl Event for TxAcceptedEvent {
85 const MODULE: Option<ModuleKind> = None;
86 const KIND: EventKind = EventKind::from_static("tx-accepted");
87 const PERSISTENCE: EventPersistence = EventPersistence::Persistent;
88}
89
90#[derive(Serialize, Deserialize)]
91pub struct TxRejectedEvent {
92 txid: TransactionId,
93 error: String,
94 operation_id: OperationId,
95}
96impl Event for TxRejectedEvent {
97 const MODULE: Option<ModuleKind> = None;
98 const KIND: EventKind = EventKind::from_static("tx-rejected");
99 const PERSISTENCE: EventPersistence = EventPersistence::Persistent;
100}
101
102#[derive(Serialize, Deserialize)]
103pub struct ModuleRecoveryStarted {
104 module_id: ModuleInstanceId,
105}
106
107impl ModuleRecoveryStarted {
108 pub fn new(module_id: ModuleInstanceId) -> Self {
109 Self { module_id }
110 }
111}
112
113impl Event for ModuleRecoveryStarted {
114 const MODULE: Option<ModuleKind> = None;
115 const KIND: EventKind = EventKind::from_static("module-recovery-started");
116 const PERSISTENCE: EventPersistence = EventPersistence::Persistent;
117}
118
119#[derive(Serialize, Deserialize)]
120pub struct ModuleRecoveryCompleted {
121 pub module_id: ModuleInstanceId,
122}
123
124impl Event for ModuleRecoveryCompleted {
125 const MODULE: Option<ModuleKind> = None;
126 const KIND: EventKind = EventKind::from_static("module-recovery-completed");
127 const PERSISTENCE: EventPersistence = EventPersistence::Persistent;
128}
129
130pub type InstancelessDynClientInput = ClientInput<Box<maybe_add_send_sync!(dyn IInput + 'static)>>;
131
132pub type InstancelessDynClientInputSM =
133 ClientInputSM<Box<maybe_add_send_sync!(dyn IState + 'static)>>;
134
135pub type InstancelessDynClientInputBundle = ClientInputBundle<
136 Box<maybe_add_send_sync!(dyn IInput + 'static)>,
137 Box<maybe_add_send_sync!(dyn IState + 'static)>,
138>;
139
140pub type InstancelessDynClientOutput =
141 ClientOutput<Box<maybe_add_send_sync!(dyn IOutput + 'static)>>;
142
143pub type InstancelessDynClientOutputSM =
144 ClientOutputSM<Box<maybe_add_send_sync!(dyn IState + 'static)>>;
145pub type InstancelessDynClientOutputBundle = ClientOutputBundle<
146 Box<maybe_add_send_sync!(dyn IOutput + 'static)>,
147 Box<maybe_add_send_sync!(dyn IState + 'static)>,
148>;
149
150#[derive(Debug, Error)]
151pub enum AddStateMachinesError {
152 #[error("State already exists in database")]
153 StateAlreadyExists,
154 #[error("Got {0}")]
155 Other(#[from] anyhow::Error),
156}
157
158pub type AddStateMachinesResult = Result<(), AddStateMachinesError>;
159
160#[apply(async_trait_maybe_send!)]
161pub trait IGlobalClientContext: Debug + MaybeSend + MaybeSync + 'static {
162 fn module_api(&self) -> DynModuleApi;
165
166 async fn client_config(&self) -> ClientConfig;
167
168 fn api(&self) -> &DynGlobalApi;
175
176 fn decoders(&self) -> &ModuleDecoderRegistry;
177
178 async fn claim_inputs_dyn(
183 &self,
184 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
185 inputs: InstancelessDynClientInputBundle,
186 ) -> anyhow::Result<OutPointRange>;
187
188 async fn fund_output_dyn(
193 &self,
194 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
195 outputs: InstancelessDynClientOutputBundle,
196 ) -> anyhow::Result<OutPointRange>;
197
198 async fn add_state_machine_dyn(
200 &self,
201 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
202 sm: Box<maybe_add_send_sync!(dyn IState)>,
203 ) -> AddStateMachinesResult;
204
205 async fn log_event_json(
206 &self,
207 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
208 kind: EventKind,
209 module: Option<(ModuleKind, ModuleInstanceId)>,
210 payload: serde_json::Value,
211 persist: EventPersistence,
212 );
213
214 async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM>;
215
216 async fn core_api_version(&self) -> ApiVersion;
218}
219
220#[apply(async_trait_maybe_send!)]
221impl IGlobalClientContext for () {
222 fn module_api(&self) -> DynModuleApi {
223 unimplemented!("fake implementation, only for tests");
224 }
225
226 async fn client_config(&self) -> ClientConfig {
227 unimplemented!("fake implementation, only for tests");
228 }
229
230 fn api(&self) -> &DynGlobalApi {
231 unimplemented!("fake implementation, only for tests");
232 }
233
234 fn decoders(&self) -> &ModuleDecoderRegistry {
235 unimplemented!("fake implementation, only for tests");
236 }
237
238 async fn claim_inputs_dyn(
239 &self,
240 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
241 _input: InstancelessDynClientInputBundle,
242 ) -> anyhow::Result<OutPointRange> {
243 unimplemented!("fake implementation, only for tests");
244 }
245
246 async fn fund_output_dyn(
247 &self,
248 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
249 _outputs: InstancelessDynClientOutputBundle,
250 ) -> anyhow::Result<OutPointRange> {
251 unimplemented!("fake implementation, only for tests");
252 }
253
254 async fn add_state_machine_dyn(
255 &self,
256 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
257 _sm: Box<maybe_add_send_sync!(dyn IState)>,
258 ) -> AddStateMachinesResult {
259 unimplemented!("fake implementation, only for tests");
260 }
261
262 async fn log_event_json(
263 &self,
264 _dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
265 _kind: EventKind,
266 _module: Option<(ModuleKind, ModuleInstanceId)>,
267 _payload: serde_json::Value,
268 _persist: EventPersistence,
269 ) {
270 unimplemented!("fake implementation, only for tests");
271 }
272
273 async fn transaction_update_stream(&self) -> BoxStream<TxSubmissionStatesSM> {
274 unimplemented!("fake implementation, only for tests");
275 }
276
277 async fn core_api_version(&self) -> ApiVersion {
278 unimplemented!("fake implementation, only for tests");
279 }
280}
281
282dyn_newtype_define! {
283 #[derive(Clone)]
286 pub DynGlobalClientContext(Arc<IGlobalClientContext>)
287}
288
289impl DynGlobalClientContext {
290 pub fn new_fake() -> Self {
291 DynGlobalClientContext::from(())
292 }
293
294 pub async fn await_tx_accepted(&self, query_txid: TransactionId) -> Result<(), String> {
295 self.transaction_update_stream()
296 .await
297 .filter_map(|tx_update| {
298 std::future::ready(match tx_update.state {
299 TxSubmissionStates::Accepted(txid) if txid == query_txid => Some(Ok(())),
300 TxSubmissionStates::Rejected(txid, submit_error) if txid == query_txid => {
301 Some(Err(submit_error))
302 }
303 _ => None,
304 })
305 })
306 .next_or_pending()
307 .await
308 }
309
310 pub async fn claim_inputs<I, S>(
311 &self,
312 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
313 inputs: ClientInputBundle<I, S>,
314 ) -> anyhow::Result<OutPointRange>
315 where
316 I: IInput + MaybeSend + MaybeSync + 'static,
317 S: IState + MaybeSend + MaybeSync + 'static,
318 {
319 self.claim_inputs_dyn(dbtx, inputs.into_instanceless())
320 .await
321 }
322
323 pub async fn fund_output<O, S>(
332 &self,
333 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
334 outputs: ClientOutputBundle<O, S>,
335 ) -> anyhow::Result<OutPointRange>
336 where
337 O: IOutput + MaybeSend + MaybeSync + 'static,
338 S: IState + MaybeSend + MaybeSync + 'static,
339 {
340 self.fund_output_dyn(dbtx, outputs.into_instanceless())
341 .await
342 }
343
344 pub async fn add_state_machine<S>(
348 &self,
349 dbtx: &mut ClientSMDatabaseTransaction<'_, '_>,
350 sm: S,
351 ) -> AddStateMachinesResult
352 where
353 S: State + MaybeSend + MaybeSync + 'static,
354 {
355 self.add_state_machine_dyn(dbtx, box_up_state(sm)).await
356 }
357
358 async fn log_event<E>(&self, dbtx: &mut ClientSMDatabaseTransaction<'_, '_>, event: E)
359 where
360 E: Event + Send,
361 {
362 self.log_event_json(
363 dbtx,
364 E::KIND,
365 E::MODULE.map(|m| (m, dbtx.module_id())),
366 serde_json::to_value(&event).expect("Payload serialization can't fail"),
367 <E as Event>::PERSISTENCE,
368 )
369 .await;
370 }
371}
372
373fn states_to_instanceless_dyn<S: IState + MaybeSend + MaybeSync + 'static>(
374 state_gen: StateGenerator<S>,
375) -> StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>> {
376 Arc::new(move |out_point_range| {
377 let states: Vec<S> = state_gen(out_point_range);
378 states
379 .into_iter()
380 .map(|state| box_up_state(state))
381 .collect()
382 })
383}
384
385fn box_up_state(state: impl IState + 'static) -> Box<maybe_add_send_sync!(dyn IState + 'static)> {
388 Box::new(state)
389}
390
391impl<T> From<Arc<T>> for DynGlobalClientContext
392where
393 T: IGlobalClientContext,
394{
395 fn from(inner: Arc<T>) -> Self {
396 DynGlobalClientContext { inner }
397 }
398}
399
400fn states_add_instance(
401 module_instance_id: ModuleInstanceId,
402 state_gen: StateGenerator<Box<maybe_add_send_sync!(dyn IState + 'static)>>,
403) -> StateGenerator<DynState> {
404 Arc::new(move |out_point_range| {
405 let states = state_gen(out_point_range);
406 Iterator::collect(
407 states
408 .into_iter()
409 .map(|state| DynState::from_parts(module_instance_id, state)),
410 )
411 })
412}
413
414pub type ModuleGlobalContextGen = ContextGen;
415
416pub struct ClientModuleInstance<'m, M: ClientModule> {
418 pub id: ModuleInstanceId,
420 pub db: Database,
422 pub api: DynModuleApi,
424
425 pub module: &'m M,
426}
427
428impl<'m, M: ClientModule> ClientModuleInstance<'m, M> {
429 pub fn inner(&self) -> &'m M {
431 self.module
432 }
433}
434
435impl<M> ops::Deref for ClientModuleInstance<'_, M>
436where
437 M: ClientModule,
438{
439 type Target = M;
440
441 fn deref(&self) -> &Self::Target {
442 self.module
443 }
444}
445#[derive(Deserialize)]
446pub struct GetInviteCodeRequest {
447 pub peer: PeerId,
448}
449
450pub struct TransactionUpdates {
451 pub update_stream: BoxStream<'static, TxSubmissionStatesSM>,
452}
453
454impl TransactionUpdates {
455 pub async fn await_tx_accepted(self, await_txid: TransactionId) -> Result<(), String> {
458 debug!(target: LOG_CLIENT, %await_txid, "Await tx accepted");
459 self.update_stream
460 .filter_map(|tx_update| {
461 std::future::ready(match tx_update.state {
462 TxSubmissionStates::Accepted(txid) if txid == await_txid => Some(Ok(())),
463 TxSubmissionStates::Rejected(txid, submit_error) if txid == await_txid => {
464 Some(Err(submit_error))
465 }
466 _ => None,
467 })
468 })
469 .next_or_pending()
470 .await?;
471 debug!(target: LOG_CLIENT, %await_txid, "Tx accepted");
472 Ok(())
473 }
474}
475
476pub struct AdminCreds {
478 pub peer_id: PeerId,
480 pub auth: ApiAuth,
482}