fedimint_server/consensus/
transaction.rs1use fedimint_core::db::DatabaseTransaction;
2use fedimint_core::module::{Amounts, CoreConsensusVersion, TransactionItemAmounts};
3use fedimint_core::transaction::{TRANSACTION_OVERFLOW_ERROR, Transaction, TransactionError};
4use fedimint_core::{InPoint, OutPoint};
5use fedimint_server_core::ServerModuleRegistry;
6use rayon::iter::{IntoParallelIterator, ParallelIterator};
7
8use crate::metrics::{CONSENSUS_TX_PROCESSED_INPUTS, CONSENSUS_TX_PROCESSED_OUTPUTS};
9
10#[derive(Debug, PartialEq, Eq)]
11pub enum TxProcessingMode {
12 Submission,
13 Consensus,
14}
15
16pub async fn process_transaction_with_dbtx(
17 modules: ServerModuleRegistry,
18 dbtx: &mut DatabaseTransaction<'_>,
19 transaction: &Transaction,
20 version: CoreConsensusVersion,
21 mode: TxProcessingMode,
22) -> Result<(), TransactionError> {
23 let in_count = transaction.inputs.len();
24 let out_count = transaction.outputs.len();
25
26 dbtx.on_commit(move || {
27 CONSENSUS_TX_PROCESSED_INPUTS.observe(in_count as f64);
28 CONSENSUS_TX_PROCESSED_OUTPUTS.observe(out_count as f64);
29 });
30
31 transaction
36 .inputs
37 .clone()
38 .into_par_iter()
39 .try_for_each(|input| {
40 modules
41 .get_expect(input.module_instance_id())
42 .verify_input(&input)
43 })
44 .map_err(|_| TransactionError::InvalidWitnessLength)?;
45
46 let mut funding_verifier = FundingVerifier::default();
47 let mut public_keys = Vec::new();
48
49 let txid = transaction.tx_hash();
50
51 for (input, in_idx) in transaction.inputs.iter().zip(0u64..) {
52 if mode == TxProcessingMode::Submission {
55 modules
56 .get_expect(input.module_instance_id())
57 .verify_input_submission(
58 &mut dbtx
59 .to_ref_with_prefix_module_id(input.module_instance_id())
60 .0,
61 input,
62 )
63 .await
64 .map_err(TransactionError::Input)?;
65 }
66 let meta = modules
67 .get_expect(input.module_instance_id())
68 .process_input(
69 &mut dbtx
70 .to_ref_with_prefix_module_id(input.module_instance_id())
71 .0,
72 input,
73 InPoint { txid, in_idx },
74 )
75 .await
76 .map_err(TransactionError::Input)?;
77
78 funding_verifier.add_input(meta.amount)?;
79 public_keys.push(meta.pub_key);
80 }
81
82 transaction.validate_signatures(&public_keys)?;
83
84 for (output, out_idx) in transaction.outputs.iter().zip(0u64..) {
85 if mode == TxProcessingMode::Submission {
88 modules
89 .get_expect(output.module_instance_id())
90 .verify_output_submission(
91 &mut dbtx
92 .to_ref_with_prefix_module_id(output.module_instance_id())
93 .0,
94 output,
95 OutPoint { txid, out_idx },
96 )
97 .await
98 .map_err(TransactionError::Output)?;
99 }
100
101 let amount = modules
102 .get_expect(output.module_instance_id())
103 .process_output(
104 &mut dbtx
105 .to_ref_with_prefix_module_id(output.module_instance_id())
106 .0,
107 output,
108 OutPoint { txid, out_idx },
109 )
110 .await
111 .map_err(TransactionError::Output)?;
112
113 funding_verifier.add_output(amount)?;
114 }
115
116 funding_verifier.verify_funding(version)?;
117
118 Ok(())
119}
120
121#[derive(Clone, Debug)]
122pub struct FundingVerifier {
123 inputs: Amounts,
124 outputs: Amounts,
125 fees: Amounts,
126}
127
128impl FundingVerifier {
129 pub fn add_input(
130 &mut self,
131 input: TransactionItemAmounts,
132 ) -> Result<&mut Self, TransactionError> {
133 self.inputs
134 .checked_add_mut(&input.amounts)
135 .ok_or(TRANSACTION_OVERFLOW_ERROR)?;
136 self.fees
137 .checked_add_mut(&input.fees)
138 .ok_or(TRANSACTION_OVERFLOW_ERROR)?;
139
140 Ok(self)
141 }
142
143 pub fn add_output(
144 &mut self,
145 output_amounts: TransactionItemAmounts,
146 ) -> Result<&mut Self, TransactionError> {
147 self.outputs
148 .checked_add_mut(&output_amounts.amounts)
149 .ok_or(TRANSACTION_OVERFLOW_ERROR)?;
150 self.fees
151 .checked_add_mut(&output_amounts.fees)
152 .ok_or(TRANSACTION_OVERFLOW_ERROR)?;
153
154 Ok(self)
155 }
156
157 pub fn verify_funding(mut self, version: CoreConsensusVersion) -> Result<(), TransactionError> {
158 const OVERPAY_MIN_VERSION: CoreConsensusVersion = CoreConsensusVersion::new(2, 1);
160
161 let outputs_and_fees = self
162 .outputs
163 .clone()
164 .checked_add(&self.fees)
165 .ok_or(TRANSACTION_OVERFLOW_ERROR)?;
166
167 for (out_unit, out_amount) in outputs_and_fees {
168 let input_amount = self.inputs.get(&out_unit).copied().unwrap_or_default();
169
170 if input_amount < out_amount
171 || (input_amount != out_amount && version < OVERPAY_MIN_VERSION)
173 {
174 return Err(TransactionError::UnbalancedTransaction {
175 inputs: input_amount,
176 outputs: self.outputs.get(&out_unit).copied().unwrap_or_default(),
177 fee: self.fees.get(&out_unit).copied().unwrap_or_default(),
178 });
179 }
180
181 self.inputs.remove(&out_unit);
183 }
184
185 if version < OVERPAY_MIN_VERSION
186 && let Some((inputs_unit, inputs_amount)) = self.inputs.into_iter().next()
187 {
188 return Err(TransactionError::UnbalancedTransaction {
189 inputs: inputs_amount,
190 outputs: self.outputs.get(&inputs_unit).copied().unwrap_or_default(),
191 fee: self.fees.get(&inputs_unit).copied().unwrap_or_default(),
192 });
193 }
194
195 Ok(())
196 }
197}
198
199impl Default for FundingVerifier {
200 fn default() -> Self {
201 FundingVerifier {
202 inputs: Amounts::ZERO,
203 outputs: Amounts::ZERO,
204 fees: Amounts::ZERO,
205 }
206 }
207}
208
209#[cfg(test)]
210mod tests;