fedimint_client_module/transaction/
sm.rs1use std::time::Duration;
4
5use fedimint_core::TransactionId;
6use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
9use fedimint_core::util::backoff_util::custom_backoff;
10use fedimint_core::util::retry;
11use fedimint_logging::LOG_CLIENT_NET_API;
12use tokio::sync::watch;
13use tracing::debug;
14
15use crate::sm::{Context, DynContext, State, StateTransition};
16use crate::{DynGlobalClientContext, DynState, TxAcceptedEvent, TxRejectedEvent};
17
18pub const TRANSACTION_SUBMISSION_MODULE_INSTANCE: ModuleInstanceId = 0xffff;
21
22#[derive(Debug, Clone)]
23pub struct TxSubmissionContext;
24
25impl Context for TxSubmissionContext {
26 const KIND: Option<ModuleKind> = None;
27}
28
29impl IntoDynInstance for TxSubmissionContext {
30 type DynType = DynContext;
31
32 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
33 DynContext::from_typed(instance_id, self)
34 }
35}
36
37#[cfg_attr(doc, aquamarine::aquamarine)]
38#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
49pub struct TxSubmissionStatesSM {
50 pub operation_id: OperationId,
51 pub state: TxSubmissionStates,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
55pub enum TxSubmissionStates {
56 Created(Transaction),
59 Accepted(TransactionId),
63 Rejected(TransactionId, String),
67 NonRetryableError(String),
72}
73
74impl State for TxSubmissionStatesSM {
75 type ModuleContext = TxSubmissionContext;
76
77 fn transitions(
78 &self,
79 _context: &Self::ModuleContext,
80 global_context: &DynGlobalClientContext,
81 ) -> Vec<StateTransition<Self>> {
82 let operation_id = self.operation_id;
83 let (tx_submitted_sender, tx_submitted_receiver) = watch::channel(false);
91 match self.state.clone() {
92 TxSubmissionStates::Created(transaction) => {
93 let txid = transaction.tx_hash();
94 vec![
95 StateTransition::new(
96 TxSubmissionStates::trigger_created_rejected(
97 transaction.clone(),
98 global_context.clone(),
99 tx_submitted_sender,
100 ),
101 {
102 let global_context = global_context.clone();
103 move |sm_dbtx, error, _| {
104 let global_context = global_context.clone();
105 Box::pin(async move {
106 global_context
107 .log_event(
108 sm_dbtx,
109 TxRejectedEvent {
110 txid,
111 operation_id,
112 error: error.clone(),
113 },
114 )
115 .await;
116 TxSubmissionStatesSM {
117 state: TxSubmissionStates::Rejected(txid, error),
118 operation_id,
119 }
120 })
121 }
122 },
123 ),
124 StateTransition::new(
125 TxSubmissionStates::trigger_created_accepted(
126 txid,
127 global_context.clone(),
128 tx_submitted_receiver,
129 ),
130 {
131 let global_context = global_context.clone();
132 move |sm_dbtx, (), _| {
133 let global_context = global_context.clone();
134 Box::pin(async move {
135 global_context
136 .log_event(sm_dbtx, TxAcceptedEvent { txid, operation_id })
137 .await;
138 TxSubmissionStatesSM {
139 state: TxSubmissionStates::Accepted(txid),
140 operation_id,
141 }
142 })
143 }
144 },
145 ),
146 ]
147 }
148 TxSubmissionStates::Accepted(..)
149 | TxSubmissionStates::Rejected(..)
150 | TxSubmissionStates::NonRetryableError(..) => {
151 vec![]
152 }
153 }
154 }
155
156 fn operation_id(&self) -> OperationId {
157 self.operation_id
158 }
159}
160
161impl TxSubmissionStates {
162 async fn trigger_created_rejected(
163 transaction: Transaction,
164 context: DynGlobalClientContext,
165 tx_submitted: watch::Sender<bool>,
166 ) -> String {
167 let txid = transaction.tx_hash();
168 debug!(target: LOG_CLIENT_NET_API, %txid, "Submitting transaction");
169 retry(
170 "tx-submit-sm",
171 custom_backoff(Duration::from_secs(2), Duration::from_mins(10), None),
172 || async {
173 if let TransactionSubmissionOutcome(Err(transaction_error)) = context
174 .api()
175 .submit_transaction(transaction.clone())
176 .await
177 .try_into_inner(context.decoders())?
178 {
179 Ok(transaction_error.to_string())
180 } else {
181 debug!(
182 target: LOG_CLIENT_NET_API,
183 %txid,
184 "Transaction submission accepted by peer, awaiting consensus",
185 );
186 tx_submitted.send_replace(true);
187 Err(anyhow::anyhow!("Transaction is still valid"))
188 }
189 },
190 )
191 .await
192 .expect("Number of retries is has no limit")
193 }
194
195 async fn trigger_created_accepted(
196 txid: TransactionId,
197 context: DynGlobalClientContext,
198 mut tx_submitted: watch::Receiver<bool>,
199 ) {
200 let _ = tx_submitted.wait_for(|submitted| *submitted).await;
201 context.api().await_transaction(txid).await;
202 debug!(target: LOG_CLIENT_NET_API, %txid, "Transaction accepted in consensus");
203 }
204}
205
206impl IntoDynInstance for TxSubmissionStatesSM {
207 type DynType = DynState;
208
209 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
210 DynState::from_typed(instance_id, self)
211 }
212}
213
214pub fn tx_submission_sm_decoder() -> Decoder {
215 let mut decoder_builder = Decoder::builder_system();
216 decoder_builder.with_decodable_type::<TxSubmissionStatesSM>();
217 decoder_builder.build()
218}