fedimint_client/sm/
executor.rs

1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Write};
5use std::mem;
6use std::sync::Arc;
7
8use anyhow::anyhow;
9use fedimint_client_module::sm::executor::{
10    ActiveStateKey, ContextGen, IExecutor, InactiveStateKey,
11};
12use fedimint_client_module::sm::{
13    ActiveStateMeta, ClientSMDatabaseTransaction, DynContext, DynState, InactiveStateMeta, State,
14    StateTransition, StateTransitionFunction,
15};
16use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
17use fedimint_core::db::{
18    AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
19    IDatabaseTransactionOpsCoreTyped,
20};
21use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
22use fedimint_core::fmt_utils::AbbreviateJson;
23use fedimint_core::module::registry::ModuleDecoderRegistry;
24use fedimint_core::task::TaskGroup;
25use fedimint_core::util::{BoxFuture, FmtCompactAnyhow as _};
26use fedimint_core::{apply, async_trait_maybe_send};
27use fedimint_eventlog::{DBTransactionEventLogExt as _, Event, EventKind, EventPersistence};
28use fedimint_logging::LOG_CLIENT_REACTOR;
29use futures::future::{self, select_all};
30use futures::stream::{FuturesUnordered, StreamExt};
31use serde::{Deserialize, Serialize};
32use tokio::select;
33use tokio::sync::{mpsc, oneshot, watch};
34use tracing::{Instrument, debug, error, info, trace, warn};
35
36use crate::sm::notifier::Notifier;
37use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
38
39/// After how many attempts a DB transaction is aborted with an error
40const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
41
42/// Prefixes for executor DB entries
43pub(crate) enum ExecutorDbPrefixes {
44    /// See [`ActiveStateKey`]
45    ActiveStates = 0xa1,
46    /// See [`InactiveStateKey`]
47    InactiveStates = 0xa2,
48}
49
50#[derive(Serialize, Deserialize)]
51pub struct StateMachineUpdated {
52    operation_id: OperationId,
53    started: bool,
54    terminal: bool,
55    module_id: ModuleInstanceId,
56}
57
58impl Event for StateMachineUpdated {
59    const MODULE: Option<fedimint_core::core::ModuleKind> = None;
60    const KIND: EventKind = EventKind::from_static("sm-updated");
61    const PERSISTENCE: EventPersistence = EventPersistence::Trimable;
62}
63
64/// Executor that drives forward state machines under its management.
65///
66/// Each state transition is atomic and supposed to be idempotent such that a
67/// stop/crash of the executor at any point can be recovered from on restart.
68/// The executor is aware of the concept of Fedimint modules and can give state
69/// machines a different [execution context](crate::module::sm::Context)
70/// depending on the owning module, making it very flexible.
71#[derive(Clone, Debug)]
72pub struct Executor {
73    inner: Arc<ExecutorInner>,
74}
75
76struct ExecutorInner {
77    db: Database,
78    state: std::sync::RwLock<ExecutorState>,
79    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
80    valid_module_ids: BTreeSet<ModuleInstanceId>,
81    notifier: Notifier,
82    /// Any time executor should notice state machine update (e.g. because it
83    /// was created), it's must be sent through this channel for it to notice.
84    sm_update_tx: mpsc::UnboundedSender<DynState>,
85    client_task_group: TaskGroup,
86    log_ordering_wakeup_tx: watch::Sender<()>,
87}
88
89enum ExecutorState {
90    Unstarted {
91        sm_update_rx: mpsc::UnboundedReceiver<DynState>,
92    },
93    Running {
94        context_gen: ContextGen,
95        shutdown_sender: oneshot::Sender<()>,
96    },
97    Stopped,
98}
99
100impl ExecutorState {
101    /// Starts the executor, returning a receiver that will be signalled when
102    /// the executor is stopped and a receiver for state machine updates.
103    /// Returns `None` if the executor has already been started and/or stopped.
104    fn start(
105        &mut self,
106        context: ContextGen,
107    ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
108        let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
109
110        let previous_state = mem::replace(
111            self,
112            ExecutorState::Running {
113                context_gen: context,
114                shutdown_sender,
115            },
116        );
117
118        match previous_state {
119            ExecutorState::Unstarted { sm_update_rx } => Some((shutdown_receiver, sm_update_rx)),
120            _ => {
121                // Replace the previous state, undoing the `mem::replace` above.
122                *self = previous_state;
123
124                debug!(target: LOG_CLIENT_REACTOR, "Executor already started, ignoring start request");
125                None
126            }
127        }
128    }
129
130    /// Stops the executor, returning `Some(())` if the executor was running and
131    /// `None` if it was in any other state.
132    fn stop(&mut self) -> Option<()> {
133        let previous_state = mem::replace(self, ExecutorState::Stopped);
134
135        match previous_state {
136            ExecutorState::Running {
137                shutdown_sender, ..
138            } => {
139                if shutdown_sender.send(()).is_err() {
140                    warn!(target: LOG_CLIENT_REACTOR, "Failed to send shutdown signal to executor, already dead?");
141                }
142                Some(())
143            }
144            _ => {
145                // Replace the previous state, undoing the `mem::replace` above.
146                *self = previous_state;
147
148                debug!(target: LOG_CLIENT_REACTOR, "Executor not running, ignoring stop request");
149                None
150            }
151        }
152    }
153
154    fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
155        let ExecutorState::Running { context_gen, .. } = self else {
156            return None;
157        };
158        Some(context_gen(
159            state.module_instance_id(),
160            state.operation_id(),
161        ))
162    }
163}
164
165/// Builder to which module clients can be attached and used to build an
166/// [`Executor`] supporting these.
167#[derive(Debug, Default)]
168pub struct ExecutorBuilder {
169    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
170    valid_module_ids: BTreeSet<ModuleInstanceId>,
171}
172
173impl Executor {
174    /// Creates an [`ExecutorBuilder`]
175    pub fn builder() -> ExecutorBuilder {
176        ExecutorBuilder::default()
177    }
178
179    pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
180        self.inner.get_active_states().await
181    }
182
183    /// Adds a number of state machines to the executor atomically. They will be
184    /// driven to completion automatically in the background.
185    ///
186    /// **Attention**: do not use before background task is started!
187    // TODO: remove warning once finality is an inherent state attribute
188    pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
189        self.inner
190            .db
191            .autocommit(
192                |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
193                MAX_DB_ATTEMPTS,
194            )
195            .await
196            .map_err(|e| match e {
197                AutocommitError::CommitFailed {
198                    last_error,
199                    attempts,
200                } => last_error.context(format!("Failed to commit after {attempts} attempts")),
201                AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
202            })?;
203
204        // TODO: notify subscribers to state changes?
205
206        Ok(())
207    }
208
209    /// Adds a number of state machines to the executor atomically with other DB
210    /// changes is `dbtx`. See [`Executor::add_state_machines`] for more
211    /// details.
212    ///
213    /// ## Panics
214    /// If called before background task is started using
215    /// [`Executor::start_executor`]!
216    // TODO: remove warning once finality is an inherent state attribute
217    pub async fn add_state_machines_dbtx(
218        &self,
219        dbtx: &mut DatabaseTransaction<'_>,
220        states: Vec<DynState>,
221    ) -> AddStateMachinesResult {
222        for state in states {
223            if !self
224                .inner
225                .valid_module_ids
226                .contains(&state.module_instance_id())
227            {
228                return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
229            }
230
231            let is_active_state = dbtx
232                .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
233                .await
234                .is_some();
235            let is_inactive_state = dbtx
236                .get_value(&InactiveStateKeyDb(InactiveStateKey::from_state(
237                    state.clone(),
238                )))
239                .await
240                .is_some();
241
242            if is_active_state || is_inactive_state {
243                return Err(AddStateMachinesError::StateAlreadyExists);
244            }
245
246            // In case of recovery functions, the module itself is not yet initialized,
247            // so we can't check if the state is terminal. However the
248            // [`Self::get_transitions_for`] function will double check and
249            // deactivate any terminal states that would slip past this check.
250            if let Some(module_context) =
251                self.inner.module_contexts.get(&state.module_instance_id())
252            {
253                match self
254                    .inner
255                    .state
256                    .read()
257                    .expect("locking failed")
258                    .gen_context(&state)
259                {
260                    Some(context) => {
261                        if state.is_terminal(module_context, &context) {
262                            return Err(AddStateMachinesError::Other(anyhow!(
263                                "State is already terminal, adding it to the executor doesn't make sense."
264                            )));
265                        }
266                    }
267                    _ => {
268                        warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
269                    }
270                }
271            }
272
273            dbtx.insert_new_entry(
274                &ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())),
275                &ActiveStateMeta::default(),
276            )
277            .await;
278
279            let operation_id = state.operation_id();
280            self.inner
281                .log_event_dbtx(
282                    dbtx,
283                    StateMachineUpdated {
284                        operation_id,
285                        started: true,
286                        terminal: false,
287                        module_id: state.module_instance_id(),
288                    },
289                )
290                .await;
291
292            let notify_sender = self.inner.notifier.sender();
293            let sm_updates_tx = self.inner.sm_update_tx.clone();
294            dbtx.on_commit(move || {
295                notify_sender.notify(state.clone());
296                let _ = sm_updates_tx.send(state);
297            });
298        }
299
300        Ok(())
301    }
302
303    /// **Mostly used for testing**
304    ///
305    /// Check if state exists in the database as part of an actively running
306    /// state machine.
307    pub async fn contains_active_state<S: State>(
308        &self,
309        instance: ModuleInstanceId,
310        state: S,
311    ) -> bool {
312        let state = DynState::from_typed(instance, state);
313        self.inner
314            .get_active_states()
315            .await
316            .into_iter()
317            .any(|(s, _)| s == state)
318    }
319
320    // TODO: unify querying fns
321    /// **Mostly used for testing**
322    ///
323    /// Check if state exists in the database as inactive. If the state is
324    /// terminal it means the corresponding state machine finished its
325    /// execution. If the state is non-terminal it means the state machine was
326    /// in that state at some point but moved on since then.
327    pub async fn contains_inactive_state<S: State>(
328        &self,
329        instance: ModuleInstanceId,
330        state: S,
331    ) -> bool {
332        let state = DynState::from_typed(instance, state);
333        self.inner
334            .get_inactive_states()
335            .await
336            .into_iter()
337            .any(|(s, _)| s == state)
338    }
339
340    pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
341        self.inner
342            .db
343            .wait_key_exists(&InactiveStateKeyDb(InactiveStateKey::from_state(state)))
344            .await
345    }
346
347    pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
348        self.inner
349            .db
350            .wait_key_exists(&ActiveStateKeyDb(ActiveStateKey::from_state(state)))
351            .await
352    }
353
354    /// Only meant for debug tooling
355    pub async fn get_operation_states(
356        &self,
357        operation_id: OperationId,
358    ) -> (
359        Vec<(DynState, ActiveStateMeta)>,
360        Vec<(DynState, InactiveStateMeta)>,
361    ) {
362        let mut dbtx = self.inner.db.begin_transaction_nc().await;
363        let active_states: Vec<_> = dbtx
364            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
365            .await
366            .map(|(active_key, active_meta)| (active_key.0.state, active_meta))
367            .collect()
368            .await;
369        let inactive_states: Vec<_> = dbtx
370            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
371            .await
372            .map(|(active_key, inactive_meta)| (active_key.0.state, inactive_meta))
373            .collect()
374            .await;
375
376        (active_states, inactive_states)
377    }
378
379    /// Starts the background thread that runs the state machines. This cannot
380    /// be done when building the executor since some global contexts in turn
381    /// may depend on the executor, forming a cyclic dependency.
382    ///
383    /// ## Panics
384    /// If called more than once.
385    pub fn start_executor(&self, context_gen: ContextGen) {
386        let Some((shutdown_receiver, sm_update_rx)) = self
387            .inner
388            .state
389            .write()
390            .expect("locking can't fail")
391            .start(context_gen.clone())
392        else {
393            panic!("start_executor was called previously");
394        };
395
396        let task_runner_inner = self.inner.clone();
397        let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
398            let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
399            let task_group_shutdown_rx = task_handle.make_shutdown_rx();
400            select! {
401                () = task_group_shutdown_rx => {
402                    debug!(
403                        target: LOG_CLIENT_REACTOR,
404                        "Shutting down state machine executor runner due to task group shutdown signal"
405                    );
406                },
407                shutdown_happened_sender = shutdown_receiver => {
408                    match shutdown_happened_sender {
409                        Ok(()) => {
410                            debug!(
411                                target: LOG_CLIENT_REACTOR,
412                                "Shutting down state machine executor runner due to explicit shutdown signal"
413                            );
414                        },
415                        Err(_) => {
416                            warn!(
417                                target: LOG_CLIENT_REACTOR,
418                                "Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)"
419                            );
420                        }
421                    }
422                },
423                () = executor_runner => {
424                    error!(target: LOG_CLIENT_REACTOR, "State machine executor runner exited unexpectedly!");
425                },
426            };
427        });
428    }
429
430    /// Stops the background task that runs the state machines.
431    ///
432    /// If a shutdown signal was sent it returns a [`oneshot::Receiver`] that
433    /// will be signalled when the main loop of the background task has
434    /// exited. This can be useful to block until the executor has stopped
435    /// to avoid errors due to the async runtime shutting down while the
436    /// task is still running.
437    ///
438    /// If no shutdown signal was sent it returns `None`. This can happen if
439    /// `stop_executor` is called multiple times.
440    ///
441    /// ## Panics
442    /// If called in parallel with [`start_executor`](Self::start_executor).
443    pub fn stop_executor(&self) -> Option<()> {
444        self.inner.stop_executor()
445    }
446
447    /// Returns a reference to the [`Notifier`] that can be used to subscribe to
448    /// state transitions
449    pub fn notifier(&self) -> &Notifier {
450        &self.inner.notifier
451    }
452}
453
454impl Drop for ExecutorInner {
455    fn drop(&mut self) {
456        self.stop_executor();
457    }
458}
459
460struct TransitionForActiveState {
461    outcome: serde_json::Value,
462    state: DynState,
463    meta: ActiveStateMeta,
464    transition_fn: StateTransitionFunction<DynState>,
465}
466
467impl ExecutorInner {
468    async fn run(
469        &self,
470        global_context_gen: ContextGen,
471        sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
472    ) {
473        debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
474        if let Err(err) = self
475            .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
476            .await
477        {
478            warn!(
479                target: LOG_CLIENT_REACTOR,
480                err = %err.fmt_compact_anyhow(),
481                "An unexpected error occurred during a state transition"
482            );
483        }
484    }
485
486    async fn get_transition_for(
487        &self,
488        state: &DynState,
489        meta: ActiveStateMeta,
490        global_context_gen: &ContextGen,
491    ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
492        let module_instance = state.module_instance_id();
493        let context = &self
494            .module_contexts
495            .get(&module_instance)
496            .expect("Unknown module");
497        let transitions = state
498            .transitions(
499                context,
500                &global_context_gen(module_instance, state.operation_id()),
501            )
502            .into_iter()
503            .map(|transition| {
504                let state = state.clone();
505                let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
506                    let StateTransition {
507                        trigger,
508                        transition,
509                    } = transition;
510                    TransitionForActiveState {
511                        outcome: trigger.await,
512                        state,
513                        transition_fn: transition,
514                        meta,
515                    }
516                });
517                f
518            })
519            .collect::<Vec<_>>();
520        if transitions.is_empty() {
521            // In certain cases a terminal (no transitions) state could get here due to
522            // module bug. Inactivate it to prevent accumulation of such states.
523            // See [`Self::add_state_machines_dbtx`].
524            warn!(
525                target: LOG_CLIENT_REACTOR,
526                module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream."
527            );
528            self.db
529                .autocommit::<_, _, anyhow::Error>(
530                    |dbtx, _| {
531                        Box::pin(async {
532                            let k = InactiveStateKey::from_state(state.clone());
533                            let v = ActiveStateMeta::default().into_inactive();
534                            dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
535                                state.clone(),
536                            )))
537                            .await;
538                            dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
539                            Ok(())
540                        })
541                    },
542                    None,
543                )
544                .await
545                .expect("Autocommit here can't fail");
546        }
547
548        transitions
549    }
550
551    async fn run_state_machines_executor_inner(
552        &self,
553        global_context_gen: ContextGen,
554        mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
555    ) -> anyhow::Result<()> {
556        /// All futures in the executor resolve to this type, so the handling
557        /// code can tell them apart.
558        enum ExecutorLoopEvent {
559            /// Notification about `DynState` arrived and should be handled,
560            /// usually added to the list of pending futures.
561            New { state: DynState },
562            /// One of trigger futures of a state machine finished and
563            /// returned transition function to run
564            Triggered(TransitionForActiveState),
565            /// The state machine did not need to run, so it was canceled
566            Invalid { state: DynState },
567            /// Transition function and all the accounting around it are done
568            Completed {
569                state: DynState,
570                outcome: ActiveOrInactiveState,
571            },
572            /// New job receiver disconnected, that can only mean termination
573            Disconnected,
574        }
575
576        let active_states = self.get_active_states().await;
577        trace!(target: LOG_CLIENT_REACTOR, "Starting active states: {:?}", active_states);
578        for (state, _meta) in active_states {
579            self.sm_update_tx
580                .send(state)
581                .expect("Must be able to send state machine to own opened channel");
582        }
583
584        // Keeps track of things already running, so we can deduplicate, just
585        // in case.
586        let mut currently_running_sms = HashSet::<DynState>::new();
587        // All things happening in parallel go into here
588        // NOTE: `FuturesUnordered` is a footgun: when it's not being polled
589        // (e.g. we picked an event and are awaiting on something to process it),
590        // nothing inside `futures` will be making progress, which in extreme cases
591        // could lead to hangs. For this reason we try really hard in the code here,
592        // to pick an event from `futures` and spawn a new task, avoiding any `await`,
593        // just so we can get back to `futures.next()` ASAP.
594        let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
595            FuturesUnordered::new();
596
597        loop {
598            let event = tokio::select! {
599                new = sm_update_rx.recv() => {
600                    match new { Some(new) => {
601                        ExecutorLoopEvent::New {
602                            state: new,
603                        }
604                    } _ => {
605                        ExecutorLoopEvent::Disconnected
606                    }}
607                },
608
609                event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
610            };
611
612            // main reactor loop: wait for next thing that completed, react (possibly adding
613            // more things to `futures`)
614            match event {
615                ExecutorLoopEvent::New { state } => {
616                    if currently_running_sms.contains(&state) {
617                        warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
618                        continue;
619                    }
620                    currently_running_sms.insert(state.clone());
621                    let futures_len = futures.len();
622                    let global_context_gen = &global_context_gen;
623                    trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
624                    futures.push(Box::pin(async move {
625                        let Some(meta) = self.get_active_state(&state).await else {
626                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
627                            return ExecutorLoopEvent::Invalid { state: state.clone() };
628                        };
629
630                        let transitions = self
631                            .get_transition_for(&state, meta, global_context_gen)
632                            .await;
633                        if transitions.is_empty() {
634                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
635                            return ExecutorLoopEvent::Invalid { state: state.clone() };
636                        }
637                        let transitions_num = transitions.len();
638
639                        debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
640
641                        let (first_completed_result, _index, _unused_transitions) =
642                            select_all(transitions).await;
643                        ExecutorLoopEvent::Triggered(first_completed_result)
644                    }));
645                }
646                ExecutorLoopEvent::Triggered(TransitionForActiveState {
647                    outcome,
648                    state,
649                    meta,
650                    transition_fn,
651                }) => {
652                    debug!(
653                        target: LOG_CLIENT_REACTOR,
654                        operation_id = %state.operation_id().fmt_short(),
655                        "Triggered state transition",
656                    );
657                    let span = tracing::debug_span!(
658                        target: LOG_CLIENT_REACTOR,
659                        "sm_transition",
660                        operation_id = %state.operation_id().fmt_short()
661                    );
662                    // Perform the transition as another future, so transitions can happen in
663                    // parallel.
664                    // Database write conflicts might be happening quite often here,
665                    // but transaction functions are supposed to be idempotent anyway,
666                    // so it seems like a good stress-test in the worst case.
667                    futures.push({
668                        let sm_update_tx = self.sm_update_tx.clone();
669                        let db = self.db.clone();
670                        let notifier = self.notifier.clone();
671                        let module_contexts = self.module_contexts.clone();
672                        let global_context_gen = global_context_gen.clone();
673                        Box::pin(
674                            async move {
675                                debug!(
676                                    target: LOG_CLIENT_REACTOR,
677                                    "Executing state transition",
678                                );
679                                trace!(
680                                    target: LOG_CLIENT_REACTOR,
681                                    ?state,
682                                    outcome = ?AbbreviateJson(&outcome),
683                                    "Executing state transition (details)",
684                                );
685
686                                let module_contexts = &module_contexts;
687                                let global_context_gen = &global_context_gen;
688
689                                let outcome = db
690                                    .autocommit::<'_, '_, _, _, Infallible>(
691                                        |dbtx, _| {
692                                            let state = state.clone();
693                                            let state_module_instance_id = state.module_instance_id();
694                                            let transition_fn = transition_fn.clone();
695                                            let transition_outcome = outcome.clone();
696                                            Box::pin(async move {
697                                                let new_state = transition_fn(
698                                                    &mut ClientSMDatabaseTransaction::new(
699                                                        &mut dbtx.to_ref(),
700                                                        state.module_instance_id(),
701                                                    ),
702                                                    transition_outcome.clone(),
703                                                    state.clone(),
704                                                )
705                                                .await;
706                                                dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
707                                                    state.clone(),
708                                                )))
709                                                .await;
710                                                dbtx.insert_entry(
711                                                    &InactiveStateKeyDb(InactiveStateKey::from_state(state.clone())),
712                                                    &meta.into_inactive(),
713                                                )
714                                                .await;
715
716                                                let context = &module_contexts
717                                                    .get(&state.module_instance_id())
718                                                    .expect("Unknown module");
719
720                                                let operation_id = state.operation_id();
721                                                let global_context = global_context_gen(
722                                                    state.module_instance_id(),
723                                                    operation_id,
724                                                );
725
726                                                let is_terminal = new_state.is_terminal(context, &global_context);
727
728                                                self.log_event_dbtx(dbtx,
729                                                    StateMachineUpdated{
730                                                        started: false,
731                                                        operation_id,
732                                                        module_id: state_module_instance_id,
733                                                        terminal: is_terminal,
734                                                    }
735                                                ).await;
736
737                                                if is_terminal {
738                                                    let k = InactiveStateKey::from_state(
739                                                        new_state.clone(),
740                                                    );
741                                                    let v = ActiveStateMeta::default().into_inactive();
742                                                    dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
743                                                    Ok(ActiveOrInactiveState::Inactive {
744                                                        dyn_state: new_state,
745                                                    })
746                                                } else {
747                                                    let k = ActiveStateKey::from_state(
748                                                        new_state.clone(),
749                                                    );
750                                                    let v = ActiveStateMeta::default();
751                                                    dbtx.insert_entry(&ActiveStateKeyDb(k), &v).await;
752                                                    Ok(ActiveOrInactiveState::Active {
753                                                        dyn_state: new_state,
754                                                        meta: v,
755                                                    })
756                                                }
757                                            })
758                                        },
759                                        None,
760                                    )
761                                    .await
762                                    .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
763
764                                debug!(
765                                    target: LOG_CLIENT_REACTOR,
766                                    terminal = !outcome.is_active(),
767                                    ?outcome,
768                                    "State transition complete",
769                                );
770
771                                match &outcome {
772                                    ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
773                                        sm_update_tx
774                                            .send(dyn_state.clone())
775                                            .expect("can't fail: we are the receiving end");
776                                        notifier.notify(dyn_state.clone());
777                                    }
778                                    ActiveOrInactiveState::Inactive { dyn_state } => {
779                                        notifier.notify(dyn_state.clone());
780                                    }
781                                }
782                                ExecutorLoopEvent::Completed { state, outcome }
783                            }
784                            .instrument(span),
785                        )
786                    });
787                }
788                ExecutorLoopEvent::Invalid { state } => {
789                    trace!(
790                        target: LOG_CLIENT_REACTOR,
791                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
792                        "State invalid"
793                    );
794                    assert!(
795                        currently_running_sms.remove(&state),
796                        "State must have been recorded"
797                    );
798                }
799
800                ExecutorLoopEvent::Completed { state, outcome } => {
801                    assert!(
802                        currently_running_sms.remove(&state),
803                        "State must have been recorded"
804                    );
805                    debug!(
806                        target: LOG_CLIENT_REACTOR,
807                        operation_id = %state.operation_id().fmt_short(),
808                        outcome_active = outcome.is_active(),
809                        total = futures.len(),
810                        "State transition complete"
811                    );
812                    trace!(
813                        target: LOG_CLIENT_REACTOR,
814                        ?outcome,
815                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
816                        "State transition complete"
817                    );
818                }
819                ExecutorLoopEvent::Disconnected => {
820                    break;
821                }
822            }
823        }
824
825        info!(target: LOG_CLIENT_REACTOR, "Terminated.");
826        Ok(())
827    }
828
829    async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
830        self.db
831            .begin_transaction_nc()
832            .await
833            .find_by_prefix(&ActiveStateKeyPrefix)
834            .await
835            // ignore states from modules that are not initialized yet
836            .filter(|(state, _)| {
837                future::ready(
838                    self.module_contexts
839                        .contains_key(&state.0.state.module_instance_id()),
840                )
841            })
842            .map(|(state, meta)| (state.0.state, meta))
843            .collect::<Vec<_>>()
844            .await
845    }
846
847    async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
848        // ignore states from modules that are not initialized yet
849        if !self
850            .module_contexts
851            .contains_key(&state.module_instance_id())
852        {
853            return None;
854        }
855        self.db
856            .begin_transaction_nc()
857            .await
858            .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
859            .await
860    }
861
862    async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
863        self.db
864            .begin_transaction_nc()
865            .await
866            .find_by_prefix(&InactiveStateKeyPrefix)
867            .await
868            // ignore states from modules that are not initialized yet
869            .filter(|(state, _)| {
870                future::ready(
871                    self.module_contexts
872                        .contains_key(&state.0.state.module_instance_id()),
873                )
874            })
875            .map(|(state, meta)| (state.0.state, meta))
876            .collect::<Vec<_>>()
877            .await
878    }
879
880    pub async fn log_event_dbtx<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
881    where
882        E: Event + Send,
883        Cap: Send,
884    {
885        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), None, event)
886            .await;
887    }
888}
889
890impl ExecutorInner {
891    /// See [`Executor::stop_executor`].
892    fn stop_executor(&self) -> Option<()> {
893        let mut state = self.state.write().expect("Locking can't fail");
894
895        state.stop()
896    }
897}
898
899impl Debug for ExecutorInner {
900    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
901        writeln!(f, "ExecutorInner {{}}")
902    }
903}
904
905impl ExecutorBuilder {
906    /// Allow executor being built to run state machines associated with the
907    /// supplied module
908    pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
909    where
910        C: IntoDynInstance<DynType = DynContext>,
911    {
912        self.with_module_dyn(context.into_dyn(instance_id));
913    }
914
915    /// Allow executor being built to run state machines associated with the
916    /// supplied module
917    pub fn with_module_dyn(&mut self, context: DynContext) {
918        self.valid_module_ids.insert(context.module_instance_id());
919
920        if self
921            .module_contexts
922            .insert(context.module_instance_id(), context)
923            .is_some()
924        {
925            panic!("Tried to add two modules with the same instance id!");
926        }
927    }
928
929    /// Allow executor to build state machines associated with the module id,
930    /// for which the module itself might not be available yet (otherwise it
931    /// would be registered with `[Self::with_module_dyn]`).
932    pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
933        self.valid_module_ids.insert(module_id);
934    }
935
936    /// Build [`Executor`] and spawn background task in `tasks` executing active
937    /// state machines. The supplied database `db` must support isolation, so
938    /// cannot be an isolated DB instance itself.
939    pub fn build(
940        self,
941        db: Database,
942        notifier: Notifier,
943        client_task_group: TaskGroup,
944        log_ordering_wakeup_tx: watch::Sender<()>,
945    ) -> Executor {
946        let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
947
948        let inner = Arc::new(ExecutorInner {
949            db,
950            log_ordering_wakeup_tx,
951            state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
952            module_contexts: self.module_contexts,
953            valid_module_ids: self.valid_module_ids,
954            notifier,
955            sm_update_tx,
956            client_task_group,
957        });
958
959        debug!(
960            target: LOG_CLIENT_REACTOR,
961            instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
962            "Initialized state machine executor with module instances"
963        );
964        Executor { inner }
965    }
966}
967#[derive(Debug)]
968pub struct ActiveOperationStateKeyPrefix {
969    pub operation_id: OperationId,
970}
971
972impl Encodable for ActiveOperationStateKeyPrefix {
973    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
974        self.operation_id.consensus_encode(writer)
975    }
976}
977
978impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
979    type Record = ActiveStateKeyDb;
980}
981
982#[derive(Debug)]
983pub(crate) struct ActiveModuleOperationStateKeyPrefix {
984    pub operation_id: OperationId,
985    pub module_instance: ModuleInstanceId,
986}
987
988impl Encodable for ActiveModuleOperationStateKeyPrefix {
989    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
990        self.operation_id.consensus_encode(writer)?;
991        self.module_instance.consensus_encode(writer)?;
992        Ok(())
993    }
994}
995
996impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
997    type Record = ActiveStateKeyDb;
998}
999
1000#[derive(Debug)]
1001pub struct ActiveStateKeyPrefix;
1002
1003impl Encodable for ActiveStateKeyPrefix {
1004    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1005        Ok(())
1006    }
1007}
1008
1009#[derive(Encodable, Decodable, Debug)]
1010pub struct ActiveStateKeyDb(pub fedimint_client_module::sm::executor::ActiveStateKey);
1011
1012impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyDb {
1013    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1014    const NOTIFY_ON_MODIFY: bool = true;
1015    type Key = Self;
1016    type Value = ActiveStateMeta;
1017}
1018
1019impl DatabaseKeyWithNotify for ActiveStateKeyDb {}
1020
1021impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
1022    type Record = ActiveStateKeyDb;
1023}
1024
1025#[derive(Debug, Encodable, Decodable)]
1026pub struct ActiveStateKeyPrefixBytes;
1027
1028impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
1029    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1030    const NOTIFY_ON_MODIFY: bool = false;
1031    type Key = Self;
1032    type Value = ActiveStateMeta;
1033}
1034
1035impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
1036    type Record = ActiveStateKeyBytes;
1037}
1038
1039#[derive(Encodable, Decodable, Debug)]
1040pub struct InactiveStateKeyDb(pub fedimint_client_module::sm::executor::InactiveStateKey);
1041
1042#[derive(Debug)]
1043pub struct InactiveStateKeyBytes {
1044    pub operation_id: OperationId,
1045    pub module_instance_id: ModuleInstanceId,
1046    pub state: Vec<u8>,
1047}
1048
1049impl Encodable for InactiveStateKeyBytes {
1050    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1051        self.operation_id.consensus_encode(writer)?;
1052        writer.write_all(self.state.as_slice())?;
1053        Ok(())
1054    }
1055}
1056
1057impl Decodable for InactiveStateKeyBytes {
1058    fn consensus_decode_partial<R: std::io::Read>(
1059        reader: &mut R,
1060        modules: &ModuleDecoderRegistry,
1061    ) -> Result<Self, DecodeError> {
1062        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1063        let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1064        let mut bytes = Vec::new();
1065        reader
1066            .read_to_end(&mut bytes)
1067            .map_err(DecodeError::from_err)?;
1068
1069        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1070        instance_bytes.append(&mut bytes);
1071
1072        Ok(InactiveStateKeyBytes {
1073            operation_id,
1074            module_instance_id,
1075            state: instance_bytes,
1076        })
1077    }
1078}
1079
1080#[derive(Debug)]
1081pub struct InactiveOperationStateKeyPrefix {
1082    pub operation_id: OperationId,
1083}
1084
1085impl Encodable for InactiveOperationStateKeyPrefix {
1086    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1087        self.operation_id.consensus_encode(writer)
1088    }
1089}
1090
1091impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1092    type Record = InactiveStateKeyDb;
1093}
1094
1095#[derive(Debug)]
1096pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1097    pub operation_id: OperationId,
1098    pub module_instance: ModuleInstanceId,
1099}
1100
1101impl Encodable for InactiveModuleOperationStateKeyPrefix {
1102    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1103        self.operation_id.consensus_encode(writer)?;
1104        self.module_instance.consensus_encode(writer)?;
1105        Ok(())
1106    }
1107}
1108
1109impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1110    type Record = InactiveStateKeyDb;
1111}
1112
1113#[derive(Debug, Clone)]
1114pub struct InactiveStateKeyPrefix;
1115
1116impl Encodable for InactiveStateKeyPrefix {
1117    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1118        Ok(())
1119    }
1120}
1121
1122#[derive(Debug, Encodable, Decodable)]
1123pub struct InactiveStateKeyPrefixBytes;
1124
1125impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1126    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1127    const NOTIFY_ON_MODIFY: bool = false;
1128    type Key = Self;
1129    type Value = InactiveStateMeta;
1130}
1131
1132impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1133    type Record = InactiveStateKeyBytes;
1134}
1135
1136#[derive(Debug)]
1137pub struct ActiveStateKeyBytes {
1138    pub operation_id: OperationId,
1139    pub module_instance_id: ModuleInstanceId,
1140    pub state: Vec<u8>,
1141}
1142
1143impl Encodable for ActiveStateKeyBytes {
1144    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1145        self.operation_id.consensus_encode(writer)?;
1146        writer.write_all(self.state.as_slice())?;
1147        Ok(())
1148    }
1149}
1150
1151impl Decodable for ActiveStateKeyBytes {
1152    fn consensus_decode_partial<R: std::io::Read>(
1153        reader: &mut R,
1154        modules: &ModuleDecoderRegistry,
1155    ) -> Result<Self, DecodeError> {
1156        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1157        let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1158        let mut bytes = Vec::new();
1159        reader
1160            .read_to_end(&mut bytes)
1161            .map_err(DecodeError::from_err)?;
1162
1163        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1164        instance_bytes.append(&mut bytes);
1165
1166        Ok(ActiveStateKeyBytes {
1167            operation_id,
1168            module_instance_id,
1169            state: instance_bytes,
1170        })
1171    }
1172}
1173impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyDb {
1174    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1175    const NOTIFY_ON_MODIFY: bool = true;
1176    type Key = Self;
1177    type Value = InactiveStateMeta;
1178}
1179
1180impl DatabaseKeyWithNotify for InactiveStateKeyDb {}
1181
1182impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1183    type Record = InactiveStateKeyDb;
1184}
1185
1186#[derive(Debug)]
1187enum ActiveOrInactiveState {
1188    Active {
1189        dyn_state: DynState,
1190        #[allow(dead_code)] // currently not printed anywhere, but useful in the db
1191        meta: ActiveStateMeta,
1192    },
1193    Inactive {
1194        dyn_state: DynState,
1195    },
1196}
1197
1198impl ActiveOrInactiveState {
1199    fn is_active(&self) -> bool {
1200        match self {
1201            ActiveOrInactiveState::Active { .. } => true,
1202            ActiveOrInactiveState::Inactive { .. } => false,
1203        }
1204    }
1205}
1206
1207#[apply(async_trait_maybe_send!)]
1208impl IExecutor for Executor {
1209    async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
1210        Self::get_active_states(self).await
1211    }
1212
1213    async fn add_state_machines_dbtx(
1214        &self,
1215        dbtx: &mut DatabaseTransaction<'_>,
1216        states: Vec<DynState>,
1217    ) -> AddStateMachinesResult {
1218        Self::add_state_machines_dbtx(self, dbtx, states).await
1219    }
1220}
1221
1222#[cfg(test)]
1223mod tests;