fedimint_client_module/module/init/
recovery.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3use std::{cmp, ops};
4
5use bitcoin::secp256k1::PublicKey;
6use fedimint_api_client::api::{
7    DynGlobalApi, VERSION_THAT_INTRODUCED_GET_SESSION_STATUS,
8    VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
9};
10use fedimint_core::db::DatabaseTransaction;
11use fedimint_core::encoding::{Decodable, Encodable};
12use fedimint_core::epoch::ConsensusItem;
13use fedimint_core::module::registry::ModuleDecoderRegistry;
14use fedimint_core::module::{ApiVersion, ModuleCommon};
15use fedimint_core::session_outcome::{AcceptedItem, SessionStatus};
16use fedimint_core::task::{MaybeSend, MaybeSync, ShuttingDownError, TaskGroup};
17use fedimint_core::transaction::Transaction;
18use fedimint_core::util::FmtCompactAnyhow as _;
19use fedimint_core::{OutPoint, PeerId, apply, async_trait_maybe_send};
20use fedimint_logging::LOG_CLIENT_RECOVERY;
21use futures::{Stream, StreamExt as _};
22use rand::{Rng as _, thread_rng};
23use serde::{Deserialize, Serialize};
24use tracing::{debug, trace, warn};
25
26use super::{ClientModuleInit, ClientModuleRecoverArgs};
27use crate::module::recovery::RecoveryProgress;
28use crate::module::{ClientContext, ClientModule};
29
30#[derive(Debug, Clone, Eq, PartialEq, Encodable, Decodable, Serialize, Deserialize)]
31/// Common state tracked during recovery from history
32pub struct RecoveryFromHistoryCommon {
33    start_session: u64,
34    next_session: u64,
35    end_session: u64,
36}
37
38impl RecoveryFromHistoryCommon {
39    pub fn new(start_session: u64, next_session: u64, end_session: u64) -> Self {
40        Self {
41            start_session,
42            next_session,
43            end_session,
44        }
45    }
46}
47
48/// Module specific logic for [`ClientModuleRecoverArgs::recover_from_history`]
49///
50/// See [`ClientModuleRecoverArgs::recover_from_history`] for more information.
51#[apply(async_trait_maybe_send!)]
52pub trait RecoveryFromHistory: std::fmt::Debug + MaybeSend + MaybeSync + Clone {
53    /// [`ClientModuleInit`] of this recovery logic.
54    type Init: ClientModuleInit;
55
56    /// New empty state to start recovery from, and session number to start from
57    async fn new(
58        init: &Self::Init,
59        args: &ClientModuleRecoverArgs<Self::Init>,
60        snapshot: Option<&<<Self::Init as ClientModuleInit>::Module as ClientModule>::Backup>,
61    ) -> anyhow::Result<(Self, u64)>;
62
63    /// Try to load the existing state previously stored with
64    /// [`RecoveryFromHistory::store_dbtx`].
65    ///
66    /// Storing and restoring progress is used to save progress and
67    /// continue recovery if it was previously terminated before completion.
68    async fn load_dbtx(
69        init: &Self::Init,
70        dbtx: &mut DatabaseTransaction<'_>,
71        args: &ClientModuleRecoverArgs<Self::Init>,
72    ) -> anyhow::Result<Option<(Self, RecoveryFromHistoryCommon)>>;
73
74    /// Store the current recovery state in the database
75    ///
76    /// See [`Self::load_dbtx`].
77    async fn store_dbtx(
78        &self,
79        dbtx: &mut DatabaseTransaction<'_>,
80        common: &RecoveryFromHistoryCommon,
81    );
82
83    /// Delete the recovery state from the database
84    ///
85    /// See [`Self::load_dbtx`].
86    async fn delete_dbtx(&self, dbtx: &mut DatabaseTransaction<'_>);
87
88    /// Read the finalization status
89    ///
90    /// See [`Self::load_dbtx`].
91    async fn load_finalized(dbtx: &mut DatabaseTransaction<'_>) -> Option<bool>;
92
93    /// Store finalization status
94    ///
95    /// See [`Self::load_finalized`].
96    async fn store_finalized(dbtx: &mut DatabaseTransaction<'_>, state: bool);
97
98    /// Handle session outcome, adjusting the current state
99    ///
100    /// It is expected that most implementations don't need to override this
101    /// function, and override more granular ones instead (e.g.
102    /// [`Self::handle_input`] and/or [`Self::handle_output`]).
103    ///
104    /// The default implementation will loop through items in the
105    /// `session.items` and forward them one by one to respective functions
106    /// (see [`Self::handle_transaction`]).
107    async fn handle_session(
108        &mut self,
109        client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
110        session_idx: u64,
111        session_items: &Vec<AcceptedItem>,
112    ) -> anyhow::Result<()> {
113        for accepted_item in session_items {
114            if let ConsensusItem::Transaction(ref transaction) = accepted_item.item {
115                self.handle_transaction(client_ctx, transaction, session_idx)
116                    .await?;
117            }
118        }
119        Ok(())
120    }
121
122    /// Handle session outcome, adjusting the current state
123    ///
124    /// It is expected that most implementations don't need to override this
125    /// function, and override more granular ones instead (e.g.
126    /// [`Self::handle_input`] and/or [`Self::handle_output`]).
127    ///
128    /// The default implementation will loop through inputs and outputs
129    /// of the transaction, filter and downcast ones matching current module
130    /// and forward them one by one to respective functions
131    /// (e.g. [`Self::handle_input`], [`Self::handle_output`]).
132    async fn handle_transaction(
133        &mut self,
134        client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
135        transaction: &Transaction,
136        session_idx: u64,
137    ) -> anyhow::Result<()> {
138        trace!(
139            target: LOG_CLIENT_RECOVERY,
140            tx_hash = %transaction.tx_hash(),
141            input_num = transaction.inputs.len(),
142            output_num = transaction.outputs.len(),
143            "processing transaction"
144        );
145
146        for (idx, input) in transaction.inputs.iter().enumerate() {
147            trace!(
148                target: LOG_CLIENT_RECOVERY,
149                tx_hash = %transaction.tx_hash(),
150                idx,
151                module_id = input.module_instance_id(),
152                "found transaction input"
153            );
154
155            if let Some(own_input) = client_ctx.input_from_dyn(input) {
156                self.handle_input(client_ctx, idx, own_input, session_idx)
157                    .await?;
158            }
159        }
160
161        for (out_idx, output) in transaction.outputs.iter().enumerate() {
162            trace!(
163                target: LOG_CLIENT_RECOVERY,
164                tx_hash = %transaction.tx_hash(),
165                idx = out_idx,
166                module_id = output.module_instance_id(),
167                "found transaction output"
168            );
169
170            if let Some(own_output) = client_ctx.output_from_dyn(output) {
171                let out_point = OutPoint {
172                    txid: transaction.tx_hash(),
173                    out_idx: out_idx as u64,
174                };
175
176                self.handle_output(client_ctx, out_point, own_output, session_idx)
177                    .await?;
178            }
179        }
180
181        Ok(())
182    }
183
184    /// Handle transaction input, adjusting the current state
185    ///
186    /// Default implementation does nothing.
187    async fn handle_input(
188        &mut self,
189        _client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
190        _idx: usize,
191        _input: &<<<Self::Init as ClientModuleInit>::Module as ClientModule>::Common as ModuleCommon>::Input,
192        _session_idx: u64,
193    ) -> anyhow::Result<()> {
194        Ok(())
195    }
196
197    /// Handle transaction output, adjusting the current state
198    ///
199    /// Default implementation does nothing.
200    async fn handle_output(
201        &mut self,
202        _client_ctx: &ClientContext<<Self::Init as ClientModuleInit>::Module>,
203        _out_point: OutPoint,
204        _output: &<<<Self::Init as ClientModuleInit>::Module as ClientModule>::Common as ModuleCommon>::Output,
205        _session_idx: u64,
206    ) -> anyhow::Result<()> {
207        Ok(())
208    }
209
210    /// Called before `finalize_dbtx`, to allow final state changes outside
211    /// of retriable database transaction.
212    async fn pre_finalize(&mut self) -> anyhow::Result<()> {
213        Ok(())
214    }
215
216    /// Finalize the recovery converting the tracked state to final
217    /// changes in the database.
218    ///
219    /// This is the only place during recovery where module gets a chance to
220    /// create state machines, etc.
221    ///
222    /// Notably this function is running in a database-autocommit wrapper, so
223    /// might be called again on database commit failure.
224    async fn finalize_dbtx(&self, dbtx: &mut DatabaseTransaction<'_>) -> anyhow::Result<()>;
225}
226
227impl<Init> ClientModuleRecoverArgs<Init>
228where
229    Init: ClientModuleInit,
230{
231    /// Run recover of a module from federation consensus history
232    ///
233    /// It is expected that most modules will implement their recovery
234    /// by following Federation consensus history to restore their
235    /// state. This function implement such a recovery by being generic
236    /// over [`RecoveryFromHistory`] trait, which provides module-specific
237    /// parts of recovery logic.
238    pub async fn recover_from_history<Recovery>(
239        &self,
240        init: &Init,
241        snapshot: Option<&<<Init as ClientModuleInit>::Module as ClientModule>::Backup>,
242    ) -> anyhow::Result<()>
243    where
244        Recovery: RecoveryFromHistory<Init = Init> + std::fmt::Debug,
245    {
246        /// Fetch epochs in a given range and send them over `sender`
247        ///
248        /// Since WASM's `spawn` does not support join handles, we indicate
249        /// errors via `sender` itself.
250        fn fetch_block_stream<'a>(
251            api: DynGlobalApi,
252            core_api_version: ApiVersion,
253            decoders: ModuleDecoderRegistry,
254            epoch_range: ops::Range<u64>,
255            broadcast_public_keys: Option<BTreeMap<PeerId, PublicKey>>,
256            task_group: TaskGroup,
257        ) -> impl futures::Stream<Item = Result<(u64, Vec<AcceptedItem>), ShuttingDownError>> + 'a
258        {
259            // How many request for blocks to run in parallel (streaming).
260            let parallelism_level =
261                if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
262                    64
263                } else {
264                    128
265                };
266
267            futures::stream::iter(epoch_range.clone())
268                .map(move |session_idx| {
269                    let api = api.clone();
270                    // When decoding we're only interested in items we can understand, so we don't
271                    // want to fail on a missing decoder of some unrelated module.
272                    let decoders = decoders.clone().with_fallback();
273                    let task_group = task_group.clone();
274                    let broadcast_public_keys = broadcast_public_keys.clone();
275
276                    Box::pin(async move {
277                        // NOTE: Each block is fetched in a spawned task. This avoids a footgun
278                        // of stuff in streams not making any progress when the stream itself
279                        // is not being polled, and possibly can increase the fetching performance.
280                        task_group.spawn_cancellable("recovery fetch block", async move {
281
282                            let mut retry_sleep = Duration::from_millis(10);
283                            let block = loop {
284                                trace!(target: LOG_CLIENT_RECOVERY, session_idx, "Awaiting signed block");
285
286                                let items_res = if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS {
287                                    api.await_block(session_idx, &decoders).await.map(|s| s.items)
288                                } else {
289                                    api.get_session_status(session_idx, &decoders, core_api_version, broadcast_public_keys.as_ref()).await.map(|s| match s {
290                                        SessionStatus::Initial => panic!("Federation missing session that existed when we started recovery"),
291                                        SessionStatus::Pending(items) => items,
292                                        SessionStatus::Complete(s) => s.items,
293                                    })
294                                };
295
296                                match items_res {
297                                    Ok(block) => {
298                                        trace!(target: LOG_CLIENT_RECOVERY, session_idx, "Got signed session");
299                                        break block
300                                    },
301                                    Err(err) => {
302                                        const MAX_SLEEP: Duration = Duration::from_secs(120);
303
304                                        warn!(target: LOG_CLIENT_RECOVERY, err = %err.fmt_compact_anyhow(), session_idx, "Error trying to fetch signed block");
305                                        // We don't want PARALLELISM_LEVEL tasks hammering Federation
306                                        // with requests, so max sleep is significant
307                                        if retry_sleep <= MAX_SLEEP {
308                                            retry_sleep = retry_sleep
309                                                + thread_rng().gen_range(Duration::ZERO..=retry_sleep);
310                                        }
311                                        fedimint_core::runtime::sleep(cmp::min(retry_sleep, MAX_SLEEP))
312                                            .await;
313                                    }
314                                }
315                            };
316
317                            (session_idx, block)
318                        }).await.expect("Can't fail")
319                    })
320                })
321                .buffered(parallelism_level)
322        }
323
324        /// Make enough progress to justify saving a state snapshot
325        async fn make_progress<Init, Recovery: RecoveryFromHistory<Init = Init>>(
326            client_ctx: &ClientContext<<Init as ClientModuleInit>::Module>,
327            common_state: &mut RecoveryFromHistoryCommon,
328            state: &mut Recovery,
329            block_stream: &mut (
330                     impl Stream<Item = Result<(u64, Vec<AcceptedItem>), ShuttingDownError>> + Unpin
331                 ),
332        ) -> anyhow::Result<()>
333        where
334            Init: ClientModuleInit,
335        {
336            /// the amount of blocks after which we unconditionally save
337            /// progress in the database (return from this function)
338            ///
339            /// We are also bound by time inside the loop, below
340            const PROGRESS_SNAPSHOT_BLOCKS: u64 = 5000;
341
342            let start = fedimint_core::time::now();
343
344            let block_range = common_state.next_session
345                ..cmp::min(
346                    common_state
347                        .next_session
348                        .wrapping_add(PROGRESS_SNAPSHOT_BLOCKS),
349                    common_state.end_session,
350                );
351
352            for _ in block_range {
353                let Some(res) = block_stream.next().await else {
354                    break;
355                };
356
357                let (session_idx, accepted_items) = res?;
358
359                assert_eq!(common_state.next_session, session_idx);
360                state
361                    .handle_session(client_ctx, session_idx, &accepted_items)
362                    .await?;
363
364                common_state.next_session += 1;
365
366                if Duration::from_secs(10)
367                    < fedimint_core::time::now()
368                        .duration_since(start)
369                        .unwrap_or_default()
370                {
371                    break;
372                }
373            }
374
375            Ok(())
376        }
377
378        let db = self.db();
379        let client_ctx = self.context();
380
381        if Recovery::load_finalized(&mut db.begin_transaction_nc().await)
382            .await
383            .unwrap_or_default()
384        {
385            // In rare circumstances, the finalization could complete, yet the completion
386            // of `recover` function not yet persisted in the database. So
387            // it's possible that `recovery` would be called again on an
388            // already finalized state. Because of this we store a
389            // finalization marker in the same dbtx as the finalization itself, detect this
390            // here and exit early.
391            //
392            // Example sequence how this happens (if `finalize_dbtx` didn't exist):
393            //
394            // 0. module recovery is complete and progress saved to the db
395            // 1. `dbtx` with finalization commits, progress deleted, completing recovery on
396            //    the client module side
397            // 2. client crashes/gets terminated (tricky corner case)
398            // 3. client starts again
399            // 4. client never observed/persisted that the module finished recovery, so
400            //    calls module recovery again
401            // 5. module doesn't see progress, starts recovery again, eventually completes
402            //    again and moves to finalization
403            // 6. module runs finalization again and probably fails because it's actually
404            //    not idempotent and doesn't expect the already existing state.
405            warn!(
406                target: LOG_CLIENT_RECOVERY,
407                "Previously finalized, exiting"
408            );
409            return Ok(());
410        }
411        let current_session_count = client_ctx.global_api().session_count().await?;
412        debug!(target: LOG_CLIENT_RECOVERY, session_count = current_session_count, "Current session count");
413
414        let (mut state, mut common_state) =
415            // TODO: if load fails (e.g. module didn't migrate an existing recovery state and failed to decode it),
416            // we could just ... start from scratch? at least being able to force this behavior might be useful
417            if let Some((state, common_state)) = Recovery::load_dbtx(init, &mut db.begin_transaction_nc().await, self).await? {
418                (state, common_state)
419            } else {
420                let (state, start_session) = Recovery::new(init, self, snapshot).await?;
421
422                debug!(target: LOG_CLIENT_RECOVERY, start_session, "Recovery start session");
423                (state,
424                RecoveryFromHistoryCommon {
425                    start_session,
426                    next_session: start_session,
427                    end_session: current_session_count + 1,
428                })
429            };
430
431        let block_stream_session_range = common_state.next_session..common_state.end_session;
432        debug!(target: LOG_CLIENT_RECOVERY, range = ?block_stream_session_range, "Starting block streaming");
433
434        let mut block_stream = fetch_block_stream(
435            self.api().clone(),
436            *self.core_api_version(),
437            client_ctx.decoders(),
438            block_stream_session_range,
439            client_ctx
440                .get_config()
441                .await
442                .global
443                .broadcast_public_keys
444                .clone(),
445            self.task_group().clone(),
446        );
447        let client_ctx = self.context();
448
449        while common_state.next_session < common_state.end_session {
450            make_progress(
451                &client_ctx,
452                &mut common_state,
453                &mut state,
454                &mut block_stream,
455            )
456            .await?;
457
458            let mut dbtx = db.begin_transaction().await;
459            state.store_dbtx(&mut dbtx.to_ref_nc(), &common_state).await;
460            dbtx.commit_tx().await;
461
462            self.update_recovery_progress(RecoveryProgress {
463                complete: (common_state.next_session - common_state.start_session)
464                    .try_into()
465                    .unwrap_or(u32::MAX),
466                total: (common_state.end_session - common_state.start_session)
467                    .try_into()
468                    .unwrap_or(u32::MAX),
469            });
470        }
471
472        state.pre_finalize().await?;
473
474        let mut dbtx = db.begin_transaction().await;
475        state.store_dbtx(&mut dbtx.to_ref_nc(), &common_state).await;
476        dbtx.commit_tx().await;
477
478        debug!(
479            target: LOG_CLIENT_RECOVERY,
480            ?state,
481            "Finalizing restore"
482        );
483
484        db.autocommit(
485            |dbtx, _| {
486                let state = state.clone();
487                {
488                    Box::pin(async move {
489                        state.delete_dbtx(dbtx).await;
490                        state.finalize_dbtx(dbtx).await?;
491                        Recovery::store_finalized(dbtx, true).await;
492
493                        Ok::<_, anyhow::Error>(())
494                    })
495                }
496            },
497            None,
498        )
499        .await?;
500
501        Ok(())
502    }
503}