fedimint_server/consensus/
transaction.rs

1use fedimint_core::db::DatabaseTransaction;
2use fedimint_core::module::{Amounts, CoreConsensusVersion, TransactionItemAmounts};
3use fedimint_core::transaction::{TRANSACTION_OVERFLOW_ERROR, Transaction, TransactionError};
4use fedimint_core::{InPoint, OutPoint};
5use fedimint_server_core::ServerModuleRegistry;
6use rayon::iter::{IntoParallelIterator, ParallelIterator};
7
8use crate::metrics::{CONSENSUS_TX_PROCESSED_INPUTS, CONSENSUS_TX_PROCESSED_OUTPUTS};
9
10#[derive(Debug, PartialEq, Eq)]
11pub enum TxProcessingMode {
12    Submission,
13    Consensus,
14}
15
16pub async fn process_transaction_with_dbtx(
17    modules: ServerModuleRegistry,
18    dbtx: &mut DatabaseTransaction<'_>,
19    transaction: &Transaction,
20    version: CoreConsensusVersion,
21    mode: TxProcessingMode,
22) -> Result<(), TransactionError> {
23    let in_count = transaction.inputs.len();
24    let out_count = transaction.outputs.len();
25
26    dbtx.on_commit(move || {
27        CONSENSUS_TX_PROCESSED_INPUTS.observe(in_count as f64);
28        CONSENSUS_TX_PROCESSED_OUTPUTS.observe(out_count as f64);
29    });
30
31    // We can not return the error here as errors are not returned in a specified
32    // order and the client still expects consensus on the error. Since the
33    // error is not extensible at the moment we need to incorrectly return the
34    // InvalidWitnessLength variant.
35    transaction
36        .inputs
37        .clone()
38        .into_par_iter()
39        .try_for_each(|input| {
40            modules
41                .get_expect(input.module_instance_id())
42                .verify_input(&input)
43        })
44        .map_err(|_| TransactionError::InvalidWitnessLength)?;
45
46    let mut funding_verifier = FundingVerifier::default();
47    let mut public_keys = Vec::new();
48
49    let txid = transaction.tx_hash();
50
51    for (input, in_idx) in transaction.inputs.iter().zip(0u64..) {
52        // somewhat unfortunately, we need to do the extra checks berofe `process_x`
53        // does the changes in the dbtx
54        if mode == TxProcessingMode::Submission {
55            modules
56                .get_expect(input.module_instance_id())
57                .verify_input_submission(
58                    &mut dbtx
59                        .to_ref_with_prefix_module_id(input.module_instance_id())
60                        .0,
61                    input,
62                )
63                .await
64                .map_err(TransactionError::Input)?;
65        }
66        let meta = modules
67            .get_expect(input.module_instance_id())
68            .process_input(
69                &mut dbtx
70                    .to_ref_with_prefix_module_id(input.module_instance_id())
71                    .0,
72                input,
73                InPoint { txid, in_idx },
74            )
75            .await
76            .map_err(TransactionError::Input)?;
77
78        funding_verifier.add_input(meta.amount)?;
79        public_keys.push(meta.pub_key);
80    }
81
82    transaction.validate_signatures(&public_keys)?;
83
84    for (output, out_idx) in transaction.outputs.iter().zip(0u64..) {
85        // somewhat unfortunately, we need to do the extra checks berofe `process_x`
86        // does the changes in the dbtx
87        if mode == TxProcessingMode::Submission {
88            modules
89                .get_expect(output.module_instance_id())
90                .verify_output_submission(
91                    &mut dbtx
92                        .to_ref_with_prefix_module_id(output.module_instance_id())
93                        .0,
94                    output,
95                    OutPoint { txid, out_idx },
96                )
97                .await
98                .map_err(TransactionError::Output)?;
99        }
100
101        let amount = modules
102            .get_expect(output.module_instance_id())
103            .process_output(
104                &mut dbtx
105                    .to_ref_with_prefix_module_id(output.module_instance_id())
106                    .0,
107                output,
108                OutPoint { txid, out_idx },
109            )
110            .await
111            .map_err(TransactionError::Output)?;
112
113        funding_verifier.add_output(amount)?;
114    }
115
116    funding_verifier.verify_funding(version)?;
117
118    Ok(())
119}
120
121#[derive(Clone, Debug)]
122pub struct FundingVerifier {
123    inputs: Amounts,
124    outputs: Amounts,
125    fees: Amounts,
126}
127
128impl FundingVerifier {
129    pub fn add_input(
130        &mut self,
131        input: TransactionItemAmounts,
132    ) -> Result<&mut Self, TransactionError> {
133        self.inputs
134            .checked_add_mut(&input.amounts)
135            .ok_or(TRANSACTION_OVERFLOW_ERROR)?;
136        self.fees
137            .checked_add_mut(&input.fees)
138            .ok_or(TRANSACTION_OVERFLOW_ERROR)?;
139
140        Ok(self)
141    }
142
143    pub fn add_output(
144        &mut self,
145        output_amounts: TransactionItemAmounts,
146    ) -> Result<&mut Self, TransactionError> {
147        self.outputs
148            .checked_add_mut(&output_amounts.amounts)
149            .ok_or(TRANSACTION_OVERFLOW_ERROR)?;
150        self.fees
151            .checked_add_mut(&output_amounts.fees)
152            .ok_or(TRANSACTION_OVERFLOW_ERROR)?;
153
154        Ok(self)
155    }
156
157    pub fn verify_funding(mut self, version: CoreConsensusVersion) -> Result<(), TransactionError> {
158        // In early versions we did not allow any overpaying
159        const OVERPAY_MIN_VERSION: CoreConsensusVersion = CoreConsensusVersion::new(2, 1);
160
161        let outputs_and_fees = self
162            .outputs
163            .clone()
164            .checked_add(&self.fees)
165            .ok_or(TRANSACTION_OVERFLOW_ERROR)?;
166
167        for (out_unit, out_amount) in outputs_and_fees {
168            let input_amount = self.inputs.get(&out_unit).copied().unwrap_or_default();
169
170            if input_amount < out_amount
171                // In early versions we did not allow any overpaying
172                ||  (input_amount != out_amount  && version < OVERPAY_MIN_VERSION)
173            {
174                return Err(TransactionError::UnbalancedTransaction {
175                    inputs: input_amount,
176                    outputs: self.outputs.get(&out_unit).copied().unwrap_or_default(),
177                    fee: self.fees.get(&out_unit).copied().unwrap_or_default(),
178                });
179            }
180
181            // Explicitly remove for the check below to
182            self.inputs.remove(&out_unit);
183        }
184
185        if version < OVERPAY_MIN_VERSION
186            && let Some((inputs_unit, inputs_amount)) = self.inputs.into_iter().next()
187        {
188            return Err(TransactionError::UnbalancedTransaction {
189                inputs: inputs_amount,
190                outputs: self.outputs.get(&inputs_unit).copied().unwrap_or_default(),
191                fee: self.fees.get(&inputs_unit).copied().unwrap_or_default(),
192            });
193        }
194
195        Ok(())
196    }
197}
198
199impl Default for FundingVerifier {
200    fn default() -> Self {
201        FundingVerifier {
202            inputs: Amounts::ZERO,
203            outputs: Amounts::ZERO,
204            fees: Amounts::ZERO,
205        }
206    }
207}
208
209#[cfg(test)]
210mod tests;