Skip to main content

fedimint_client_module/transaction/
sm.rs

1//! State machine for submitting transactions
2
3use std::time::Duration;
4
5use fedimint_core::TransactionId;
6use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
7use fedimint_core::encoding::{Decodable, Encodable};
8use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
9use fedimint_core::util::backoff_util::custom_backoff;
10use fedimint_core::util::retry;
11use fedimint_logging::LOG_CLIENT_NET_API;
12use tokio::sync::watch;
13use tracing::debug;
14
15use crate::sm::{Context, DynContext, State, StateTransition};
16use crate::{DynGlobalClientContext, DynState, TxAcceptedEvent, TxRejectedEvent};
17
18// TODO: how to prevent collisions? Generally reserve some range for custom IDs?
19/// Reserved module instance id used for client-internal state machines
20pub const TRANSACTION_SUBMISSION_MODULE_INSTANCE: ModuleInstanceId = 0xffff;
21
22#[derive(Debug, Clone)]
23pub struct TxSubmissionContext;
24
25impl Context for TxSubmissionContext {
26    const KIND: Option<ModuleKind> = None;
27}
28
29impl IntoDynInstance for TxSubmissionContext {
30    type DynType = DynContext;
31
32    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
33        DynContext::from_typed(instance_id, self)
34    }
35}
36
37#[cfg_attr(doc, aquamarine::aquamarine)]
38/// State machine to (re-)submit a transaction until it is either accepted or
39/// rejected by the federation
40///
41/// ```mermaid
42/// flowchart LR
43///     Created -- tx is accepted by consensus --> Accepted
44///     Created -- tx is rejected on submission --> Rejected
45/// ```
46// NOTE: This struct needs to retain the same encoding as [`crate::sm::OperationState`],
47// because it was used to replace it, and clients already have it persisted.
48#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
49pub struct TxSubmissionStatesSM {
50    pub operation_id: OperationId,
51    pub state: TxSubmissionStates,
52}
53
54#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
55pub enum TxSubmissionStates {
56    /// The transaction has been created and potentially already been submitted,
57    /// but no rejection or acceptance happened so far
58    Created(Transaction),
59    /// The transaction has been accepted in consensus
60    ///
61    /// **This state is final**
62    Accepted(TransactionId),
63    /// The transaction has been rejected by a quorum on submission
64    ///
65    /// **This state is final**
66    Rejected(TransactionId, String),
67    // Ideally this would be uncommented:
68    // #[deprecated(since = "0.2.2", note = "all errors should be retried")]
69    // but due to some rust bug/limitation it seem impossible to prevent
70    // existing usages from spamming compilation output with warnings.
71    NonRetryableError(String),
72}
73
74impl State for TxSubmissionStatesSM {
75    type ModuleContext = TxSubmissionContext;
76
77    fn transitions(
78        &self,
79        _context: &Self::ModuleContext,
80        global_context: &DynGlobalClientContext,
81    ) -> Vec<StateTransition<Self>> {
82        let operation_id = self.operation_id;
83        // There is no point awaiting tx until it was submitted, so
84        // `trigger_created_rejected` which does the submitting will use this
85        // channel to let the `trigger_created_accepted` which does the awaiting
86        // know when it did the submission.
87        //
88        // Submitting tx does not guarantee that it will get into consensus, so the
89        // submitting need to continue.
90        let (tx_submitted_sender, tx_submitted_receiver) = watch::channel(false);
91        match self.state.clone() {
92            TxSubmissionStates::Created(transaction) => {
93                let txid = transaction.tx_hash();
94                vec![
95                    StateTransition::new(
96                        TxSubmissionStates::trigger_created_rejected(
97                            transaction.clone(),
98                            global_context.clone(),
99                            tx_submitted_sender,
100                        ),
101                        {
102                            let global_context = global_context.clone();
103                            move |sm_dbtx, error, _| {
104                                let global_context = global_context.clone();
105                                Box::pin(async move {
106                                    global_context
107                                        .log_event(
108                                            sm_dbtx,
109                                            TxRejectedEvent {
110                                                txid,
111                                                operation_id,
112                                                error: error.clone(),
113                                            },
114                                        )
115                                        .await;
116                                    TxSubmissionStatesSM {
117                                        state: TxSubmissionStates::Rejected(txid, error),
118                                        operation_id,
119                                    }
120                                })
121                            }
122                        },
123                    ),
124                    StateTransition::new(
125                        TxSubmissionStates::trigger_created_accepted(
126                            txid,
127                            global_context.clone(),
128                            tx_submitted_receiver,
129                        ),
130                        {
131                            let global_context = global_context.clone();
132                            move |sm_dbtx, (), _| {
133                                let global_context = global_context.clone();
134                                Box::pin(async move {
135                                    global_context
136                                        .log_event(sm_dbtx, TxAcceptedEvent { txid, operation_id })
137                                        .await;
138                                    TxSubmissionStatesSM {
139                                        state: TxSubmissionStates::Accepted(txid),
140                                        operation_id,
141                                    }
142                                })
143                            }
144                        },
145                    ),
146                ]
147            }
148            TxSubmissionStates::Accepted(..)
149            | TxSubmissionStates::Rejected(..)
150            | TxSubmissionStates::NonRetryableError(..) => {
151                vec![]
152            }
153        }
154    }
155
156    fn operation_id(&self) -> OperationId {
157        self.operation_id
158    }
159}
160
161impl TxSubmissionStates {
162    async fn trigger_created_rejected(
163        transaction: Transaction,
164        context: DynGlobalClientContext,
165        tx_submitted: watch::Sender<bool>,
166    ) -> String {
167        let txid = transaction.tx_hash();
168        debug!(target: LOG_CLIENT_NET_API, %txid, "Submitting transaction");
169        retry(
170            "tx-submit-sm",
171            custom_backoff(Duration::from_secs(2), Duration::from_mins(10), None),
172            || async {
173                if let TransactionSubmissionOutcome(Err(transaction_error)) = context
174                    .api()
175                    .submit_transaction(transaction.clone())
176                    .await
177                    .try_into_inner(context.decoders())?
178                {
179                    Ok(transaction_error.to_string())
180                } else {
181                    debug!(
182                        target: LOG_CLIENT_NET_API,
183                        %txid,
184                        "Transaction submission accepted by peer, awaiting consensus",
185                    );
186                    tx_submitted.send_replace(true);
187                    Err(anyhow::anyhow!("Transaction is still valid"))
188                }
189            },
190        )
191        .await
192        .expect("Number of retries is has no limit")
193    }
194
195    async fn trigger_created_accepted(
196        txid: TransactionId,
197        context: DynGlobalClientContext,
198        mut tx_submitted: watch::Receiver<bool>,
199    ) {
200        let _ = tx_submitted.wait_for(|submitted| *submitted).await;
201        context.api().await_transaction(txid).await;
202        debug!(target: LOG_CLIENT_NET_API, %txid, "Transaction accepted in consensus");
203    }
204}
205
206impl IntoDynInstance for TxSubmissionStatesSM {
207    type DynType = DynState;
208
209    fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
210        DynState::from_typed(instance_id, self)
211    }
212}
213
214pub fn tx_submission_sm_decoder() -> Decoder {
215    let mut decoder_builder = Decoder::builder_system();
216    decoder_builder.with_decodable_type::<TxSubmissionStatesSM>();
217    decoder_builder.build()
218}