Skip to main content

fedimint_client_module/transaction/
sm.rs

1//! State machine for submitting transactions
2
3use std::time::Duration;
4
5use fedimint_core::TransactionId;
6use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
9use fedimint_core::util::backoff_util::custom_backoff;
10use fedimint_core::util::retry;
11use fedimint_logging::LOG_CLIENT_NET_API;
12use tokio::sync::watch;
13use tracing::debug;
14
15use crate::sm::{Context, DynContext, State, StateTransition};
16use crate::{DynGlobalClientContext, DynState, TxAcceptedEvent, TxRejectedEvent};
17
18// TODO: how to prevent collisions? Generally reserve some range for custom IDs?
19/// Reserved module instance id used for client-internal state machines
20pub const TRANSACTION_SUBMISSION_MODULE_INSTANCE: ModuleInstanceId = 0xffff;
21
22#[derive(Debug, Clone)]
23pub struct TxSubmissionContext;
24
25impl Context for TxSubmissionContext {
26    const KIND: Option<ModuleKind> = None;
27}
28
29impl IntoDynInstance for TxSubmissionContext {
30    type DynType = DynContext;
31
32    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
33        DynContext::from_typed(instance_id, self)
34    }
35}
36
37#[cfg_attr(doc, aquamarine::aquamarine)]
38/// State machine to (re-)submit a transaction until it is either accepted or
39/// rejected by the federation
40///
41/// ```mermaid
42/// flowchart LR
43///     Created -- tx is accepted by consensus --> Accepted
44///     Created -- tx is rejected on submission --> Rejected
45/// ```
46// NOTE: This struct needs to retain the same encoding as [`crate::sm::OperationState`],
47// because it was used to replace it, and clients already have it persisted.
48#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
49pub struct TxSubmissionStatesSM {
50    pub operation_id: OperationId,
51    pub state: TxSubmissionStates,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
55pub enum TxSubmissionStates {
56    /// The transaction has been created and potentially already been submitted,
57    /// but no rejection or acceptance happened so far
58    Created(Transaction),
59    /// The transaction has been accepted in consensus
60    ///
61    /// **This state is final**
62    Accepted(TransactionId),
63    /// The transaction has been rejected by a quorum on submission
64    ///
65    /// **This state is final**
66    Rejected(TransactionId, String),
67    // Ideally this would be uncommented:
68    // #[deprecated(since = "0.2.2", note = "all errors should be retried")]
69    // but due to some rust bug/limitation it seem impossible to prevent
70    // existing usages from spamming compilation output with warnings.
71    NonRetryableError(String),
72}
73
74impl State for TxSubmissionStatesSM {
75    type ModuleContext = TxSubmissionContext;
76
77    fn transitions(
78        &self,
79        _context: &Self::ModuleContext,
80        global_context: &DynGlobalClientContext,
81    ) -> Vec<StateTransition<Self>> {
82        let operation_id = self.operation_id;
83        // There is no point awaiting tx until it was submitted, so
84        // `trigger_created_rejected` which does the submitting will use this
85        // channel to let the `trigger_created_accepted` which does the awaiting
86        // know when it did the submission.
87        //
88        // Submitting tx does not guarantee that it will get into consensus, so the
89        // submitting need to continue.
90        let (tx_submitted_sender, tx_submitted_receiver) = watch::channel(false);
91        match self.state.clone() {
92            TxSubmissionStates::Created(transaction) => {
93                let txid = transaction.tx_hash();
94                vec![
95                    StateTransition::new(
96                        TxSubmissionStates::trigger_created_rejected(
97                            transaction.clone(),
98                            global_context.clone(),
99                            tx_submitted_sender,
100                        ),
101                        {
102                            let global_context = global_context.clone();
103                            move |sm_dbtx, error, _| {
104                                let global_context = global_context.clone();
105                                Box::pin(async move {
106                                    global_context
107                                        .log_event(
108                                            sm_dbtx,
109                                            TxRejectedEvent {
110                                                txid,
111                                                operation_id,
112                                                error: error.clone(),
113                                            },
114                                        )
115                                        .await;
116                                    TxSubmissionStatesSM {
117                                        state: TxSubmissionStates::Rejected(txid, error),
118                                        operation_id,
119                                    }
120                                })
121                            }
122                        },
123                    ),
124                    StateTransition::new(
125                        TxSubmissionStates::trigger_created_accepted(
126                            txid,
127                            global_context.clone(),
128                            tx_submitted_receiver,
129                        ),
130                        {
131                            let global_context = global_context.clone();
132                            move |sm_dbtx, (), _| {
133                                let global_context = global_context.clone();
134                                Box::pin(async move {
135                                    global_context
136                                        .log_event(sm_dbtx, TxAcceptedEvent { txid, operation_id })
137                                        .await;
138                                    TxSubmissionStatesSM {
139                                        state: TxSubmissionStates::Accepted(txid),
140                                        operation_id,
141                                    }
142                                })
143                            }
144                        },
145                    ),
146                ]
147            }
148            TxSubmissionStates::Accepted(..)
149            | TxSubmissionStates::Rejected(..)
150            | TxSubmissionStates::NonRetryableError(..) => {
151                vec![]
152            }
153        }
154    }
155
156    fn operation_id(&self) -> OperationId {
157        self.operation_id
158    }
159
160    fn fmt_visualization(&self, f: &mut dyn std::fmt::Write, indent: &str) -> std::fmt::Result {
161        match &self.state {
162            TxSubmissionStates::Created(tx) => {
163                let txid = tx.tx_hash();
164                write!(
165                    f,
166                    "{indent}TxSubmissionStatesSM\n{indent}  state: Created  txid={}  inputs={}  outputs={}",
167                    txid.fmt_short(),
168                    tx.inputs.len(),
169                    tx.outputs.len(),
170                )
171            }
172            TxSubmissionStates::Accepted(txid) => {
173                write!(
174                    f,
175                    "{indent}TxSubmissionStatesSM\n{indent}  state: Accepted  txid={}",
176                    txid.fmt_short(),
177                )
178            }
179            TxSubmissionStates::Rejected(txid, err) => {
180                write!(
181                    f,
182                    "{indent}TxSubmissionStatesSM\n{indent}  state: Rejected  txid={}  error={err}",
183                    txid.fmt_short(),
184                )
185            }
186            TxSubmissionStates::NonRetryableError(err) => {
187                write!(
188                    f,
189                    "{indent}TxSubmissionStatesSM\n{indent}  state: NonRetryableError  error={err}",
190                )
191            }
192        }
193    }
194}
195
196impl TxSubmissionStates {
197    async fn trigger_created_rejected(
198        transaction: Transaction,
199        context: DynGlobalClientContext,
200        tx_submitted: watch::Sender<bool>,
201    ) -> String {
202        let txid = transaction.tx_hash();
203        debug!(target: LOG_CLIENT_NET_API, %txid, "Submitting transaction");
204        retry(
205            "tx-submit-sm",
206            custom_backoff(Duration::from_secs(2), Duration::from_mins(10), None),
207            || async {
208                if let TransactionSubmissionOutcome(Err(transaction_error)) = context
209                    .api()
210                    .submit_transaction(transaction.clone())
211                    .await
212                    .try_into_inner(context.decoders())?
213                {
214                    Ok(transaction_error.to_string())
215                } else {
216                    debug!(
217                        target: LOG_CLIENT_NET_API,
218                        %txid,
219                        "Transaction submission accepted by peer, awaiting consensus",
220                    );
221                    tx_submitted.send_replace(true);
222                    Err(anyhow::anyhow!("Transaction is still valid"))
223                }
224            },
225        )
226        .await
227        .expect("Number of retries is has no limit")
228    }
229
230    async fn trigger_created_accepted(
231        txid: TransactionId,
232        context: DynGlobalClientContext,
233        mut tx_submitted: watch::Receiver<bool>,
234    ) {
235        let _ = tx_submitted.wait_for(|submitted| *submitted).await;
236        context.api().await_transaction(txid).await;
237        debug!(target: LOG_CLIENT_NET_API, %txid, "Transaction accepted in consensus");
238    }
239}
240
241impl IntoDynInstance for TxSubmissionStatesSM {
242    type DynType = DynState;
243
244    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
245        DynState::from_typed(instance_id, self)
246    }
247}
248
249pub fn tx_submission_sm_decoder() -> Decoder {
250    let mut decoder_builder = Decoder::builder_system();
251    decoder_builder.with_decodable_type::<TxSubmissionStatesSM>();
252    decoder_builder.build()
253}