fedimint_client_module/transaction/
sm.rs1use std::time::Duration;
4
5use fedimint_core::TransactionId;
6use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
9use fedimint_core::util::backoff_util::custom_backoff;
10use fedimint_core::util::retry;
11use fedimint_logging::LOG_CLIENT_NET_API;
12use tokio::sync::watch;
13use tracing::debug;
14
15use crate::sm::{Context, DynContext, State, StateTransition};
16use crate::{DynGlobalClientContext, DynState, TxAcceptedEvent, TxRejectedEvent};
17
18pub const TRANSACTION_SUBMISSION_MODULE_INSTANCE: ModuleInstanceId = 0xffff;
21
22#[derive(Debug, Clone)]
23pub struct TxSubmissionContext;
24
25impl Context for TxSubmissionContext {
26 const KIND: Option<ModuleKind> = None;
27}
28
29impl IntoDynInstance for TxSubmissionContext {
30 type DynType = DynContext;
31
32 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
33 DynContext::from_typed(instance_id, self)
34 }
35}
36
37#[cfg_attr(doc, aquamarine::aquamarine)]
38#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
49pub struct TxSubmissionStatesSM {
50 pub operation_id: OperationId,
51 pub state: TxSubmissionStates,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
55pub enum TxSubmissionStates {
56 Created(Transaction),
59 Accepted(TransactionId),
63 Rejected(TransactionId, String),
67 NonRetryableError(String),
72}
73
74impl State for TxSubmissionStatesSM {
75 type ModuleContext = TxSubmissionContext;
76
77 fn transitions(
78 &self,
79 _context: &Self::ModuleContext,
80 global_context: &DynGlobalClientContext,
81 ) -> Vec<StateTransition<Self>> {
82 let operation_id = self.operation_id;
83 let (tx_submitted_sender, tx_submitted_receiver) = watch::channel(false);
91 match self.state.clone() {
92 TxSubmissionStates::Created(transaction) => {
93 let txid = transaction.tx_hash();
94 vec![
95 StateTransition::new(
96 TxSubmissionStates::trigger_created_rejected(
97 transaction.clone(),
98 global_context.clone(),
99 tx_submitted_sender,
100 ),
101 {
102 let global_context = global_context.clone();
103 move |sm_dbtx, error, _| {
104 let global_context = global_context.clone();
105 Box::pin(async move {
106 global_context
107 .log_event(
108 sm_dbtx,
109 TxRejectedEvent {
110 txid,
111 operation_id,
112 error: error.clone(),
113 },
114 )
115 .await;
116 TxSubmissionStatesSM {
117 state: TxSubmissionStates::Rejected(txid, error),
118 operation_id,
119 }
120 })
121 }
122 },
123 ),
124 StateTransition::new(
125 TxSubmissionStates::trigger_created_accepted(
126 txid,
127 global_context.clone(),
128 tx_submitted_receiver,
129 ),
130 {
131 let global_context = global_context.clone();
132 move |sm_dbtx, (), _| {
133 let global_context = global_context.clone();
134 Box::pin(async move {
135 global_context
136 .log_event(sm_dbtx, TxAcceptedEvent { txid, operation_id })
137 .await;
138 TxSubmissionStatesSM {
139 state: TxSubmissionStates::Accepted(txid),
140 operation_id,
141 }
142 })
143 }
144 },
145 ),
146 ]
147 }
148 TxSubmissionStates::Accepted(..)
149 | TxSubmissionStates::Rejected(..)
150 | TxSubmissionStates::NonRetryableError(..) => {
151 vec![]
152 }
153 }
154 }
155
156 fn operation_id(&self) -> OperationId {
157 self.operation_id
158 }
159
160 fn fmt_visualization(&self, f: &mut dyn std::fmt::Write, indent: &str) -> std::fmt::Result {
161 match &self.state {
162 TxSubmissionStates::Created(tx) => {
163 let txid = tx.tx_hash();
164 write!(
165 f,
166 "{indent}TxSubmissionStatesSM\n{indent} state: Created txid={} inputs={} outputs={}",
167 txid.fmt_short(),
168 tx.inputs.len(),
169 tx.outputs.len(),
170 )
171 }
172 TxSubmissionStates::Accepted(txid) => {
173 write!(
174 f,
175 "{indent}TxSubmissionStatesSM\n{indent} state: Accepted txid={}",
176 txid.fmt_short(),
177 )
178 }
179 TxSubmissionStates::Rejected(txid, err) => {
180 write!(
181 f,
182 "{indent}TxSubmissionStatesSM\n{indent} state: Rejected txid={} error={err}",
183 txid.fmt_short(),
184 )
185 }
186 TxSubmissionStates::NonRetryableError(err) => {
187 write!(
188 f,
189 "{indent}TxSubmissionStatesSM\n{indent} state: NonRetryableError error={err}",
190 )
191 }
192 }
193 }
194}
195
196impl TxSubmissionStates {
197 async fn trigger_created_rejected(
198 transaction: Transaction,
199 context: DynGlobalClientContext,
200 tx_submitted: watch::Sender<bool>,
201 ) -> String {
202 let txid = transaction.tx_hash();
203 debug!(target: LOG_CLIENT_NET_API, %txid, "Submitting transaction");
204 retry(
205 "tx-submit-sm",
206 custom_backoff(Duration::from_secs(2), Duration::from_mins(10), None),
207 || async {
208 if let TransactionSubmissionOutcome(Err(transaction_error)) = context
209 .api()
210 .submit_transaction(transaction.clone())
211 .await
212 .try_into_inner(context.decoders())?
213 {
214 Ok(transaction_error.to_string())
215 } else {
216 debug!(
217 target: LOG_CLIENT_NET_API,
218 %txid,
219 "Transaction submission accepted by peer, awaiting consensus",
220 );
221 tx_submitted.send_replace(true);
222 Err(anyhow::anyhow!("Transaction is still valid"))
223 }
224 },
225 )
226 .await
227 .expect("Number of retries is has no limit")
228 }
229
230 async fn trigger_created_accepted(
231 txid: TransactionId,
232 context: DynGlobalClientContext,
233 mut tx_submitted: watch::Receiver<bool>,
234 ) {
235 let _ = tx_submitted.wait_for(|submitted| *submitted).await;
236 context.api().await_transaction(txid).await;
237 debug!(target: LOG_CLIENT_NET_API, %txid, "Transaction accepted in consensus");
238 }
239}
240
241impl IntoDynInstance for TxSubmissionStatesSM {
242 type DynType = DynState;
243
244 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
245 DynState::from_typed(instance_id, self)
246 }
247}
248
249pub fn tx_submission_sm_decoder() -> Decoder {
250 let mut decoder_builder = Decoder::builder_system();
251 decoder_builder.with_decodable_type::<TxSubmissionStatesSM>();
252 decoder_builder.build()
253}