Skip to main content

fedimint_client/sm/
executor.rs

1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Write};
5use std::mem;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::anyhow;
10use fedimint_client_module::sm::executor::{
11    ActiveStateKey, ContextGen, IExecutor, InactiveStateKey,
12};
13use fedimint_client_module::sm::{
14    ActiveStateMeta, ClientSMDatabaseTransaction, DynContext, DynState, InactiveStateMeta, State,
15    StateTransition, StateTransitionFunction,
16};
17use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
18use fedimint_core::db::{
19    AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
20    IDatabaseTransactionOpsCoreTyped,
21};
22use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
23use fedimint_core::fmt_utils::AbbreviateJson;
24use fedimint_core::module::registry::ModuleDecoderRegistry;
25use fedimint_core::task::TaskGroup;
26use fedimint_core::util::{BoxFuture, FmtCompactAnyhow as _};
27use fedimint_core::{apply, async_trait_maybe_send};
28use fedimint_eventlog::{DBTransactionEventLogExt as _, Event, EventKind, EventPersistence};
29use fedimint_logging::LOG_CLIENT_REACTOR;
30use futures::future::{self, select_all};
31use futures::stream::{FuturesUnordered, StreamExt};
32use serde::{Deserialize, Serialize};
33use tokio::select;
34use tokio::sync::{mpsc, oneshot, watch};
35use tracing::{Instrument, debug, error, info, trace, warn};
36
37use crate::sm::notifier::Notifier;
38use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
39
40/// After how many attempts a DB transaction is aborted with an error
41const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
42
43/// Prefixes for executor DB entries
44pub(crate) enum ExecutorDbPrefixes {
45    /// See [`ActiveStateKey`]
46    ActiveStates = 0xa1,
47    /// See [`InactiveStateKey`]
48    InactiveStates = 0xa2,
49}
50
51#[derive(Serialize, Deserialize)]
52pub struct StateMachineUpdated {
53    operation_id: OperationId,
54    started: bool,
55    terminal: bool,
56    module_id: ModuleInstanceId,
57}
58
59impl Event for StateMachineUpdated {
60    const MODULE: Option<fedimint_core::core::ModuleKind> = None;
61    const KIND: EventKind = EventKind::from_static("sm-updated");
62    const PERSISTENCE: EventPersistence = EventPersistence::Trimable;
63}
64
65/// Executor that drives forward state machines under its management.
66///
67/// Each state transition is atomic and supposed to be idempotent such that a
68/// stop/crash of the executor at any point can be recovered from on restart.
69/// The executor is aware of the concept of Fedimint modules and can give state
70/// machines a different [execution context](crate::module::sm::Context)
71/// depending on the owning module, making it very flexible.
72#[derive(Clone, Debug)]
73pub struct Executor {
74    inner: Arc<ExecutorInner>,
75}
76
77struct ExecutorInner {
78    db: Database,
79    state: std::sync::RwLock<ExecutorState>,
80    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
81    valid_module_ids: BTreeSet<ModuleInstanceId>,
82    notifier: Notifier,
83    /// Any time executor should notice state machine update (e.g. because it
84    /// was created), it's must be sent through this channel for it to notice.
85    sm_update_tx: mpsc::UnboundedSender<DynState>,
86    client_task_group: TaskGroup,
87    log_ordering_wakeup_tx: watch::Sender<()>,
88}
89
90enum ExecutorState {
91    Unstarted {
92        sm_update_rx: mpsc::UnboundedReceiver<DynState>,
93    },
94    Running {
95        context_gen: ContextGen,
96        shutdown_sender: oneshot::Sender<()>,
97    },
98    Stopped,
99}
100
101impl ExecutorState {
102    /// Starts the executor, returning a receiver that will be signalled when
103    /// the executor is stopped and a receiver for state machine updates.
104    /// Returns `None` if the executor has already been started and/or stopped.
105    fn start(
106        &mut self,
107        context: ContextGen,
108    ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
109        let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
110
111        let previous_state = mem::replace(
112            self,
113            ExecutorState::Running {
114                context_gen: context,
115                shutdown_sender,
116            },
117        );
118
119        match previous_state {
120            ExecutorState::Unstarted { sm_update_rx } => Some((shutdown_receiver, sm_update_rx)),
121            _ => {
122                // Replace the previous state, undoing the `mem::replace` above.
123                *self = previous_state;
124
125                debug!(target: LOG_CLIENT_REACTOR, "Executor already started, ignoring start request");
126                None
127            }
128        }
129    }
130
131    /// Stops the executor, returning `Some(())` if the executor was running and
132    /// `None` if it was in any other state.
133    fn stop(&mut self) -> Option<()> {
134        let previous_state = mem::replace(self, ExecutorState::Stopped);
135
136        match previous_state {
137            ExecutorState::Running {
138                shutdown_sender, ..
139            } => {
140                if shutdown_sender.send(()).is_err() {
141                    warn!(target: LOG_CLIENT_REACTOR, "Failed to send shutdown signal to executor, already dead?");
142                }
143                Some(())
144            }
145            _ => {
146                // Replace the previous state, undoing the `mem::replace` above.
147                *self = previous_state;
148
149                debug!(target: LOG_CLIENT_REACTOR, "Executor not running, ignoring stop request");
150                None
151            }
152        }
153    }
154
155    fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
156        let ExecutorState::Running { context_gen, .. } = self else {
157            return None;
158        };
159        Some(context_gen(
160            state.module_instance_id(),
161            state.operation_id(),
162        ))
163    }
164}
165
166/// Builder to which module clients can be attached and used to build an
167/// [`Executor`] supporting these.
168#[derive(Debug, Default)]
169pub struct ExecutorBuilder {
170    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
171    valid_module_ids: BTreeSet<ModuleInstanceId>,
172}
173
174impl Executor {
175    /// Creates an [`ExecutorBuilder`]
176    pub fn builder() -> ExecutorBuilder {
177        ExecutorBuilder::default()
178    }
179
180    pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
181        self.inner.get_active_states().await
182    }
183
184    /// Adds a number of state machines to the executor atomically. They will be
185    /// driven to completion automatically in the background.
186    ///
187    /// **Attention**: do not use before background task is started!
188    // TODO: remove warning once finality is an inherent state attribute
189    pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
190        self.inner
191            .db
192            .autocommit(
193                |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
194                MAX_DB_ATTEMPTS,
195            )
196            .await
197            .map_err(|e| match e {
198                AutocommitError::CommitFailed {
199                    last_error,
200                    attempts,
201                } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
202                AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
203            })?;
204
205        // TODO: notify subscribers to state changes?
206
207        Ok(())
208    }
209
210    /// Adds a number of state machines to the executor atomically with other DB
211    /// changes is `dbtx`. See [`Executor::add_state_machines`] for more
212    /// details.
213    ///
214    /// ## Panics
215    /// If called before background task is started using
216    /// [`Executor::start_executor`]!
217    // TODO: remove warning once finality is an inherent state attribute
218    pub async fn add_state_machines_dbtx(
219        &self,
220        dbtx: &mut DatabaseTransaction<'_>,
221        states: Vec<DynState>,
222    ) -> AddStateMachinesResult {
223        for state in states {
224            if !self
225                .inner
226                .valid_module_ids
227                .contains(&state.module_instance_id())
228            {
229                return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
230            }
231
232            let is_active_state = dbtx
233                .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
234                .await
235                .is_some();
236            let is_inactive_state = dbtx
237                .get_value(&InactiveStateKeyDb(InactiveStateKey::from_state(
238                    state.clone(),
239                )))
240                .await
241                .is_some();
242
243            if is_active_state || is_inactive_state {
244                return Err(AddStateMachinesError::StateAlreadyExists);
245            }
246
247            // In case of recovery functions, the module itself is not yet initialized,
248            // so we can't check if the state is terminal. However the
249            // [`Self::get_transitions_for`] function will double check and
250            // deactivate any terminal states that would slip past this check.
251            if let Some(module_context) =
252                self.inner.module_contexts.get(&state.module_instance_id())
253            {
254                match self
255                    .inner
256                    .state
257                    .read()
258                    .expect("locking failed")
259                    .gen_context(&state)
260                {
261                    Some(context) => {
262                        if state.is_terminal(module_context, &context) {
263                            return Err(AddStateMachinesError::Other(anyhow!(
264                                "State is already terminal, adding it to the executor doesn't make sense."
265                            )));
266                        }
267                    }
268                    _ => {
269                        warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
270                    }
271                }
272            }
273
274            dbtx.insert_new_entry(
275                &ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())),
276                &ActiveStateMeta::default(),
277            )
278            .await;
279
280            let operation_id = state.operation_id();
281            self.inner
282                .log_event_dbtx(
283                    dbtx,
284                    StateMachineUpdated {
285                        operation_id,
286                        started: true,
287                        terminal: false,
288                        module_id: state.module_instance_id(),
289                    },
290                )
291                .await;
292
293            let notify_sender = self.inner.notifier.sender();
294            let sm_updates_tx = self.inner.sm_update_tx.clone();
295            dbtx.on_commit(move || {
296                notify_sender.notify(state.clone());
297                let _ = sm_updates_tx.send(state);
298            });
299        }
300
301        Ok(())
302    }
303
304    /// **Mostly used for testing**
305    ///
306    /// Check if state exists in the database as part of an actively running
307    /// state machine.
308    pub async fn contains_active_state<S: State>(
309        &self,
310        instance: ModuleInstanceId,
311        state: S,
312    ) -> bool {
313        let state = DynState::from_typed(instance, state);
314        self.inner
315            .get_active_states()
316            .await
317            .into_iter()
318            .any(|(s, _)| s == state)
319    }
320
321    // TODO: unify querying fns
322    /// **Mostly used for testing**
323    ///
324    /// Check if state exists in the database as inactive. If the state is
325    /// terminal it means the corresponding state machine finished its
326    /// execution. If the state is non-terminal it means the state machine was
327    /// in that state at some point but moved on since then.
328    pub async fn contains_inactive_state<S: State>(
329        &self,
330        instance: ModuleInstanceId,
331        state: S,
332    ) -> bool {
333        let state = DynState::from_typed(instance, state);
334        self.inner
335            .get_inactive_states()
336            .await
337            .into_iter()
338            .any(|(s, _)| s == state)
339    }
340
341    pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
342        self.inner
343            .db
344            .wait_key_exists(&InactiveStateKeyDb(InactiveStateKey::from_state(state)))
345            .await
346    }
347
348    pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
349        self.inner
350            .db
351            .wait_key_exists(&ActiveStateKeyDb(ActiveStateKey::from_state(state)))
352            .await
353    }
354
355    /// Only meant for debug tooling
356    pub async fn get_operation_states(
357        &self,
358        operation_id: OperationId,
359    ) -> (
360        Vec<(DynState, ActiveStateMeta)>,
361        Vec<(DynState, InactiveStateMeta)>,
362    ) {
363        let mut dbtx = self.inner.db.begin_transaction_nc().await;
364        let active_states: Vec<_> = dbtx
365            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
366            .await
367            .map(|(active_key, active_meta)| (active_key.0.state, active_meta))
368            .collect()
369            .await;
370        let inactive_states: Vec<_> = dbtx
371            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
372            .await
373            .map(|(active_key, inactive_meta)| (active_key.0.state, inactive_meta))
374            .collect()
375            .await;
376
377        (active_states, inactive_states)
378    }
379
380    /// Starts the background thread that runs the state machines. This cannot
381    /// be done when building the executor since some global contexts in turn
382    /// may depend on the executor, forming a cyclic dependency.
383    ///
384    /// ## Panics
385    /// If called more than once.
386    pub fn start_executor(&self, context_gen: ContextGen) {
387        let Some((shutdown_receiver, sm_update_rx)) = self
388            .inner
389            .state
390            .write()
391            .expect("locking can't fail")
392            .start(context_gen.clone())
393        else {
394            panic!("start_executor was called previously");
395        };
396
397        let task_runner_inner = self.inner.clone();
398        let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
399            let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
400            let task_group_shutdown_rx = task_handle.make_shutdown_rx();
401            select! {
402                () = task_group_shutdown_rx => {
403                    debug!(
404                        target: LOG_CLIENT_REACTOR,
405                        "Shutting down state machine executor runner due to task group shutdown signal"
406                    );
407                },
408                shutdown_happened_sender = shutdown_receiver => {
409                    match shutdown_happened_sender {
410                        Ok(()) => {
411                            debug!(
412                                target: LOG_CLIENT_REACTOR,
413                                "Shutting down state machine executor runner due to explicit shutdown signal"
414                            );
415                        },
416                        Err(_) => {
417                            warn!(
418                                target: LOG_CLIENT_REACTOR,
419                                "Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)"
420                            );
421                        }
422                    }
423                },
424                () = executor_runner => {
425                    error!(target: LOG_CLIENT_REACTOR, "State machine executor runner exited unexpectedly!");
426                },
427            };
428        });
429    }
430
431    /// Stops the background task that runs the state machines.
432    ///
433    /// If a shutdown signal was sent it returns a [`oneshot::Receiver`] that
434    /// will be signalled when the main loop of the background task has
435    /// exited. This can be useful to block until the executor has stopped
436    /// to avoid errors due to the async runtime shutting down while the
437    /// task is still running.
438    ///
439    /// If no shutdown signal was sent it returns `None`. This can happen if
440    /// `stop_executor` is called multiple times.
441    ///
442    /// ## Panics
443    /// If called in parallel with [`start_executor`](Self::start_executor).
444    pub fn stop_executor(&self) -> Option<()> {
445        self.inner.stop_executor()
446    }
447
448    /// Returns a reference to the [`Notifier`] that can be used to subscribe to
449    /// state transitions
450    pub fn notifier(&self) -> &Notifier {
451        &self.inner.notifier
452    }
453}
454
455impl Drop for ExecutorInner {
456    fn drop(&mut self) {
457        self.stop_executor();
458    }
459}
460
461struct TransitionForActiveState {
462    outcome: serde_json::Value,
463    state: DynState,
464    meta: ActiveStateMeta,
465    transition_fn: StateTransitionFunction<DynState>,
466}
467
468impl ExecutorInner {
469    async fn run(
470        &self,
471        global_context_gen: ContextGen,
472        sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
473    ) {
474        debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
475        if let Err(err) = self
476            .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
477            .await
478        {
479            warn!(
480                target: LOG_CLIENT_REACTOR,
481                err = %err.fmt_compact_anyhow(),
482                "An unexpected error occurred during a state transition"
483            );
484        }
485    }
486
487    async fn get_transition_for(
488        &self,
489        state: &DynState,
490        meta: ActiveStateMeta,
491        global_context_gen: &ContextGen,
492    ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
493        let module_instance = state.module_instance_id();
494        let context = &self
495            .module_contexts
496            .get(&module_instance)
497            .expect("Unknown module");
498        let transitions = state
499            .transitions(
500                context,
501                &global_context_gen(module_instance, state.operation_id()),
502            )
503            .into_iter()
504            .map(|transition| {
505                let state = state.clone();
506                let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
507                    let StateTransition {
508                        trigger,
509                        transition,
510                    } = transition;
511                    TransitionForActiveState {
512                        outcome: trigger.await,
513                        state,
514                        transition_fn: transition,
515                        meta,
516                    }
517                });
518                f
519            })
520            .collect::<Vec<_>>();
521        if transitions.is_empty() {
522            // In certain cases a terminal (no transitions) state could get here due to
523            // module bug. Inactivate it to prevent accumulation of such states.
524            // See [`Self::add_state_machines_dbtx`].
525            warn!(
526                target: LOG_CLIENT_REACTOR,
527                module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream."
528            );
529            self.db
530                .autocommit::<_, _, anyhow::Error>(
531                    |dbtx, _| {
532                        Box::pin(async {
533                            let k = InactiveStateKey::from_state(state.clone());
534                            let v = ActiveStateMeta::default().into_inactive();
535                            dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
536                                state.clone(),
537                            )))
538                            .await;
539                            dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
540                            Ok(())
541                        })
542                    },
543                    None,
544                )
545                .await
546                .expect("Autocommit here can't fail");
547        }
548
549        transitions
550    }
551
552    async fn run_state_machines_executor_inner(
553        &self,
554        global_context_gen: ContextGen,
555        mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
556    ) -> anyhow::Result<()> {
557        /// All futures in the executor resolve to this type, so the handling
558        /// code can tell them apart.
559        enum ExecutorLoopEvent {
560            /// Notification about `DynState` arrived and should be handled,
561            /// usually added to the list of pending futures.
562            New { state: DynState },
563            /// One of trigger futures of a state machine finished and
564            /// returned transition function to run
565            Triggered(TransitionForActiveState),
566            /// The state machine did not need to run, so it was canceled
567            Invalid { state: DynState },
568            /// Transition function and all the accounting around it are done
569            Completed {
570                state: DynState,
571                outcome: ActiveOrInactiveState,
572            },
573            /// New job receiver disconnected, that can only mean termination
574            Disconnected,
575        }
576
577        let active_states = self.get_active_states().await;
578        debug!(
579            target: LOG_CLIENT_REACTOR,
580            total = active_states.len(),
581            "Starting active state machines",
582        );
583        for (state, meta) in active_states {
584            trace!(target: LOG_CLIENT_REACTOR, ?state, ?meta, "Starting active state");
585
586            let age = fedimint_core::time::now()
587                .duration_since(meta.created_at)
588                .unwrap_or_default();
589            if age > Duration::from_secs(7 * 24 * 3600) {
590                warn!(
591                    target: LOG_CLIENT_REACTOR,
592                    operation_id = %state.operation_id().fmt_short(),
593                    module_instance = %state.module_instance_id(),
594                    age_days = age.as_secs() / 86400,
595                    "Active state machine has been running for over a week, possibly stuck",
596                );
597            }
598            self.sm_update_tx
599                .send(state)
600                .expect("Must be able to send state machine to own opened channel");
601        }
602
603        // Keeps track of things already running, so we can deduplicate, just
604        // in case.
605        let mut currently_running_sms = HashSet::<DynState>::new();
606        // All things happening in parallel go into here
607        // NOTE: `FuturesUnordered` is a footgun: when it's not being polled
608        // (e.g. we picked an event and are awaiting on something to process it),
609        // nothing inside `futures` will be making progress, which in extreme cases
610        // could lead to hangs. For this reason we try really hard in the code here,
611        // to pick an event from `futures` and spawn a new task, avoiding any `await`,
612        // just so we can get back to `futures.next()` ASAP.
613        let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
614            FuturesUnordered::new();
615
616        loop {
617            let event = tokio::select! {
618                new = sm_update_rx.recv() => {
619                    match new { Some(new) => {
620                        ExecutorLoopEvent::New {
621                            state: new,
622                        }
623                    } _ => {
624                        ExecutorLoopEvent::Disconnected
625                    }}
626                },
627
628                event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
629            };
630
631            // main reactor loop: wait for next thing that completed, react (possibly adding
632            // more things to `futures`)
633            match event {
634                ExecutorLoopEvent::New { state } => {
635                    if currently_running_sms.contains(&state) {
636                        warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
637                        continue;
638                    }
639                    currently_running_sms.insert(state.clone());
640                    let futures_len = futures.len();
641                    let global_context_gen = &global_context_gen;
642                    trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
643                    futures.push(Box::pin(async move {
644                        let Some(meta) = self.get_active_state(&state).await else {
645                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
646                            return ExecutorLoopEvent::Invalid { state: state.clone() };
647                        };
648
649                        let transitions = self
650                            .get_transition_for(&state, meta, global_context_gen)
651                            .await;
652                        if transitions.is_empty() {
653                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
654                            return ExecutorLoopEvent::Invalid { state: state.clone() };
655                        }
656                        let transitions_num = transitions.len();
657
658                        debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
659
660                        let (first_completed_result, _index, _unused_transitions) =
661                            select_all(transitions).await;
662                        ExecutorLoopEvent::Triggered(first_completed_result)
663                    }));
664                }
665                ExecutorLoopEvent::Triggered(TransitionForActiveState {
666                    outcome,
667                    state,
668                    meta,
669                    transition_fn,
670                }) => {
671                    debug!(
672                        target: LOG_CLIENT_REACTOR,
673                        operation_id = %state.operation_id().fmt_short(),
674                        "Triggered state transition",
675                    );
676                    let span = tracing::debug_span!(
677                        target: LOG_CLIENT_REACTOR,
678                        "sm_transition",
679                        operation_id = %state.operation_id().fmt_short()
680                    );
681                    // Perform the transition as another future, so transitions can happen in
682                    // parallel.
683                    // Database write conflicts might be happening quite often here,
684                    // but transaction functions are supposed to be idempotent anyway,
685                    // so it seems like a good stress-test in the worst case.
686                    futures.push({
687                        let sm_update_tx = self.sm_update_tx.clone();
688                        let db = self.db.clone();
689                        let notifier = self.notifier.clone();
690                        let module_contexts = self.module_contexts.clone();
691                        let global_context_gen = global_context_gen.clone();
692                        Box::pin(
693                            async move {
694                                debug!(
695                                    target: LOG_CLIENT_REACTOR,
696                                    "Executing state transition",
697                                );
698                                trace!(
699                                    target: LOG_CLIENT_REACTOR,
700                                    ?state,
701                                    outcome = ?AbbreviateJson(&outcome),
702                                    "Executing state transition (details)",
703                                );
704
705                                let module_contexts = &module_contexts;
706                                let global_context_gen = &global_context_gen;
707
708                                let outcome = db
709                                    .autocommit::<'_, '_, _, _, Infallible>(
710                                        |dbtx, _| {
711                                            let state = state.clone();
712                                            let state_module_instance_id = state.module_instance_id();
713                                            let transition_fn = transition_fn.clone();
714                                            let transition_outcome = outcome.clone();
715                                            Box::pin(async move {
716                                                let new_state = transition_fn(
717                                                    &mut ClientSMDatabaseTransaction::new(
718                                                        &mut dbtx.to_ref(),
719                                                        state.module_instance_id(),
720                                                    ),
721                                                    transition_outcome.clone(),
722                                                    state.clone(),
723                                                )
724                                                .await;
725                                                dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
726                                                    state.clone(),
727                                                )))
728                                                .await;
729                                                dbtx.insert_entry(
730                                                    &InactiveStateKeyDb(InactiveStateKey::from_state(state.clone())),
731                                                    &meta.into_inactive(),
732                                                )
733                                                .await;
734
735                                                let context = &module_contexts
736                                                    .get(&state.module_instance_id())
737                                                    .expect("Unknown module");
738
739                                                let operation_id = state.operation_id();
740                                                let global_context = global_context_gen(
741                                                    state.module_instance_id(),
742                                                    operation_id,
743                                                );
744
745                                                let is_terminal = new_state.is_terminal(context, &global_context);
746
747                                                self.log_event_dbtx(dbtx,
748                                                    StateMachineUpdated{
749                                                        started: false,
750                                                        operation_id,
751                                                        module_id: state_module_instance_id,
752                                                        terminal: is_terminal,
753                                                    }
754                                                ).await;
755
756                                                if is_terminal {
757                                                    let k = InactiveStateKey::from_state(
758                                                        new_state.clone(),
759                                                    );
760                                                    let v = ActiveStateMeta::default().into_inactive();
761                                                    dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
762                                                    Ok(ActiveOrInactiveState::Inactive {
763                                                        dyn_state: new_state,
764                                                    })
765                                                } else {
766                                                    let k = ActiveStateKey::from_state(
767                                                        new_state.clone(),
768                                                    );
769                                                    let v = ActiveStateMeta::default();
770                                                    dbtx.insert_entry(&ActiveStateKeyDb(k), &v).await;
771                                                    Ok(ActiveOrInactiveState::Active {
772                                                        dyn_state: new_state,
773                                                        meta: v,
774                                                    })
775                                                }
776                                            })
777                                        },
778                                        None,
779                                    )
780                                    .await
781                                    .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
782
783                                debug!(
784                                    target: LOG_CLIENT_REACTOR,
785                                    terminal = !outcome.is_active(),
786                                    ?outcome,
787                                    "State transition complete",
788                                );
789
790                                match &outcome {
791                                    ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
792                                        sm_update_tx
793                                            .send(dyn_state.clone())
794                                            .expect("can't fail: we are the receiving end");
795                                        notifier.notify(dyn_state.clone());
796                                    }
797                                    ActiveOrInactiveState::Inactive { dyn_state } => {
798                                        notifier.notify(dyn_state.clone());
799                                    }
800                                }
801                                ExecutorLoopEvent::Completed { state, outcome }
802                            }
803                            .instrument(span),
804                        )
805                    });
806                }
807                ExecutorLoopEvent::Invalid { state } => {
808                    trace!(
809                        target: LOG_CLIENT_REACTOR,
810                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
811                        "State invalid"
812                    );
813                    assert!(
814                        currently_running_sms.remove(&state),
815                        "State must have been recorded"
816                    );
817                }
818
819                ExecutorLoopEvent::Completed { state, outcome } => {
820                    assert!(
821                        currently_running_sms.remove(&state),
822                        "State must have been recorded"
823                    );
824                    debug!(
825                        target: LOG_CLIENT_REACTOR,
826                        operation_id = %state.operation_id().fmt_short(),
827                        outcome_active = outcome.is_active(),
828                        total = futures.len(),
829                        "State transition complete"
830                    );
831                    trace!(
832                        target: LOG_CLIENT_REACTOR,
833                        ?outcome,
834                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
835                        "State transition complete"
836                    );
837                }
838                ExecutorLoopEvent::Disconnected => {
839                    break;
840                }
841            }
842        }
843
844        info!(target: LOG_CLIENT_REACTOR, "Terminated.");
845        Ok(())
846    }
847
848    async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
849        self.db
850            .begin_transaction_nc()
851            .await
852            .find_by_prefix(&ActiveStateKeyPrefix)
853            .await
854            // ignore states from modules that are not initialized yet
855            .filter(|(state, _)| {
856                future::ready(
857                    self.module_contexts
858                        .contains_key(&state.0.state.module_instance_id()),
859                )
860            })
861            .map(|(state, meta)| (state.0.state, meta))
862            .collect::<Vec<_>>()
863            .await
864    }
865
866    async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
867        // ignore states from modules that are not initialized yet
868        if !self
869            .module_contexts
870            .contains_key(&state.module_instance_id())
871        {
872            return None;
873        }
874        self.db
875            .begin_transaction_nc()
876            .await
877            .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
878            .await
879    }
880
881    async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
882        self.db
883            .begin_transaction_nc()
884            .await
885            .find_by_prefix(&InactiveStateKeyPrefix)
886            .await
887            // ignore states from modules that are not initialized yet
888            .filter(|(state, _)| {
889                future::ready(
890                    self.module_contexts
891                        .contains_key(&state.0.state.module_instance_id()),
892                )
893            })
894            .map(|(state, meta)| (state.0.state, meta))
895            .collect::<Vec<_>>()
896            .await
897    }
898
899    pub async fn log_event_dbtx<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
900    where
901        E: Event + Send,
902        Cap: Send,
903    {
904        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), None, event)
905            .await;
906    }
907}
908
909impl ExecutorInner {
910    /// See [`Executor::stop_executor`].
911    fn stop_executor(&self) -> Option<()> {
912        let mut state = self.state.write().expect("Locking can't fail");
913
914        state.stop()
915    }
916}
917
918impl Debug for ExecutorInner {
919    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
920        writeln!(f, "ExecutorInner {{}}")
921    }
922}
923
924impl ExecutorBuilder {
925    /// Allow executor being built to run state machines associated with the
926    /// supplied module
927    pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
928    where
929        C: IntoDynInstance<DynType = DynContext>,
930    {
931        self.with_module_dyn(context.into_dyn(instance_id));
932    }
933
934    /// Allow executor being built to run state machines associated with the
935    /// supplied module
936    pub fn with_module_dyn(&mut self, context: DynContext) {
937        self.valid_module_ids.insert(context.module_instance_id());
938
939        if self
940            .module_contexts
941            .insert(context.module_instance_id(), context)
942            .is_some()
943        {
944            panic!("Tried to add two modules with the same instance id!");
945        }
946    }
947
948    /// Allow executor to build state machines associated with the module id,
949    /// for which the module itself might not be available yet (otherwise it
950    /// would be registered with `[Self::with_module_dyn]`).
951    pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
952        self.valid_module_ids.insert(module_id);
953    }
954
955    /// Build [`Executor`] and spawn background task in `tasks` executing active
956    /// state machines. The supplied database `db` must support isolation, so
957    /// cannot be an isolated DB instance itself.
958    pub fn build(
959        self,
960        db: Database,
961        notifier: Notifier,
962        client_task_group: TaskGroup,
963        log_ordering_wakeup_tx: watch::Sender<()>,
964    ) -> Executor {
965        let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
966
967        let inner = Arc::new(ExecutorInner {
968            db,
969            log_ordering_wakeup_tx,
970            state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
971            module_contexts: self.module_contexts,
972            valid_module_ids: self.valid_module_ids,
973            notifier,
974            sm_update_tx,
975            client_task_group,
976        });
977
978        debug!(
979            target: LOG_CLIENT_REACTOR,
980            instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
981            "Initialized state machine executor with module instances"
982        );
983        Executor { inner }
984    }
985}
986#[derive(Debug)]
987pub struct ActiveOperationStateKeyPrefix {
988    pub operation_id: OperationId,
989}
990
991impl Encodable for ActiveOperationStateKeyPrefix {
992    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
993        self.operation_id.consensus_encode(writer)
994    }
995}
996
997impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
998    type Record = ActiveStateKeyDb;
999}
1000
1001#[derive(Debug)]
1002pub(crate) struct ActiveModuleOperationStateKeyPrefix {
1003    pub operation_id: OperationId,
1004    pub module_instance: ModuleInstanceId,
1005}
1006
1007impl Encodable for ActiveModuleOperationStateKeyPrefix {
1008    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1009        self.operation_id.consensus_encode(writer)?;
1010        self.module_instance.consensus_encode(writer)?;
1011        Ok(())
1012    }
1013}
1014
1015impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
1016    type Record = ActiveStateKeyDb;
1017}
1018
1019#[derive(Debug)]
1020pub struct ActiveStateKeyPrefix;
1021
1022impl Encodable for ActiveStateKeyPrefix {
1023    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1024        Ok(())
1025    }
1026}
1027
1028#[derive(Encodable, Decodable, Debug)]
1029pub struct ActiveStateKeyDb(pub fedimint_client_module::sm::executor::ActiveStateKey);
1030
1031impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyDb {
1032    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1033    const NOTIFY_ON_MODIFY: bool = true;
1034    type Key = Self;
1035    type Value = ActiveStateMeta;
1036}
1037
1038impl DatabaseKeyWithNotify for ActiveStateKeyDb {}
1039
1040impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
1041    type Record = ActiveStateKeyDb;
1042}
1043
1044#[derive(Debug, Encodable, Decodable)]
1045pub struct ActiveStateKeyPrefixBytes;
1046
1047impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
1048    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1049    const NOTIFY_ON_MODIFY: bool = false;
1050    type Key = Self;
1051    type Value = ActiveStateMeta;
1052}
1053
1054impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
1055    type Record = ActiveStateKeyBytes;
1056}
1057
1058#[derive(Encodable, Decodable, Debug)]
1059pub struct InactiveStateKeyDb(pub fedimint_client_module::sm::executor::InactiveStateKey);
1060
1061#[derive(Debug)]
1062pub struct InactiveStateKeyBytes {
1063    pub operation_id: OperationId,
1064    pub module_instance_id: ModuleInstanceId,
1065    pub state: Vec<u8>,
1066}
1067
1068impl Encodable for InactiveStateKeyBytes {
1069    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1070        self.operation_id.consensus_encode(writer)?;
1071        writer.write_all(self.state.as_slice())?;
1072        Ok(())
1073    }
1074}
1075
1076impl Decodable for InactiveStateKeyBytes {
1077    fn consensus_decode_partial<R: std::io::Read>(
1078        reader: &mut R,
1079        modules: &ModuleDecoderRegistry,
1080    ) -> Result<Self, DecodeError> {
1081        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1082        let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1083        let mut bytes = Vec::new();
1084        reader
1085            .read_to_end(&mut bytes)
1086            .map_err(DecodeError::from_err)?;
1087
1088        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1089        instance_bytes.append(&mut bytes);
1090
1091        Ok(InactiveStateKeyBytes {
1092            operation_id,
1093            module_instance_id,
1094            state: instance_bytes,
1095        })
1096    }
1097}
1098
1099#[derive(Debug)]
1100pub struct InactiveOperationStateKeyPrefix {
1101    pub operation_id: OperationId,
1102}
1103
1104impl Encodable for InactiveOperationStateKeyPrefix {
1105    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1106        self.operation_id.consensus_encode(writer)
1107    }
1108}
1109
1110impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1111    type Record = InactiveStateKeyDb;
1112}
1113
1114#[derive(Debug)]
1115pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1116    pub operation_id: OperationId,
1117    pub module_instance: ModuleInstanceId,
1118}
1119
1120impl Encodable for InactiveModuleOperationStateKeyPrefix {
1121    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1122        self.operation_id.consensus_encode(writer)?;
1123        self.module_instance.consensus_encode(writer)?;
1124        Ok(())
1125    }
1126}
1127
1128impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1129    type Record = InactiveStateKeyDb;
1130}
1131
1132#[derive(Debug, Clone)]
1133pub struct InactiveStateKeyPrefix;
1134
1135impl Encodable for InactiveStateKeyPrefix {
1136    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1137        Ok(())
1138    }
1139}
1140
1141#[derive(Debug, Encodable, Decodable)]
1142pub struct InactiveStateKeyPrefixBytes;
1143
1144impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1145    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1146    const NOTIFY_ON_MODIFY: bool = false;
1147    type Key = Self;
1148    type Value = InactiveStateMeta;
1149}
1150
1151impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1152    type Record = InactiveStateKeyBytes;
1153}
1154
1155#[derive(Debug)]
1156pub struct ActiveStateKeyBytes {
1157    pub operation_id: OperationId,
1158    pub module_instance_id: ModuleInstanceId,
1159    pub state: Vec<u8>,
1160}
1161
1162impl Encodable for ActiveStateKeyBytes {
1163    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1164        self.operation_id.consensus_encode(writer)?;
1165        writer.write_all(self.state.as_slice())?;
1166        Ok(())
1167    }
1168}
1169
1170impl Decodable for ActiveStateKeyBytes {
1171    fn consensus_decode_partial<R: std::io::Read>(
1172        reader: &mut R,
1173        modules: &ModuleDecoderRegistry,
1174    ) -> Result<Self, DecodeError> {
1175        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1176        let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1177        let mut bytes = Vec::new();
1178        reader
1179            .read_to_end(&mut bytes)
1180            .map_err(DecodeError::from_err)?;
1181
1182        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1183        instance_bytes.append(&mut bytes);
1184
1185        Ok(ActiveStateKeyBytes {
1186            operation_id,
1187            module_instance_id,
1188            state: instance_bytes,
1189        })
1190    }
1191}
1192impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyDb {
1193    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1194    const NOTIFY_ON_MODIFY: bool = true;
1195    type Key = Self;
1196    type Value = InactiveStateMeta;
1197}
1198
1199impl DatabaseKeyWithNotify for InactiveStateKeyDb {}
1200
1201impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1202    type Record = InactiveStateKeyDb;
1203}
1204
1205#[derive(Debug)]
1206enum ActiveOrInactiveState {
1207    Active {
1208        dyn_state: DynState,
1209        #[allow(dead_code)] // currently not printed anywhere, but useful in the db
1210        meta: ActiveStateMeta,
1211    },
1212    Inactive {
1213        dyn_state: DynState,
1214    },
1215}
1216
1217impl ActiveOrInactiveState {
1218    fn is_active(&self) -> bool {
1219        match self {
1220            ActiveOrInactiveState::Active { .. } => true,
1221            ActiveOrInactiveState::Inactive { .. } => false,
1222        }
1223    }
1224}
1225
1226#[apply(async_trait_maybe_send!)]
1227impl IExecutor for Executor {
1228    async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
1229        Self::get_active_states(self).await
1230    }
1231
1232    async fn add_state_machines_dbtx(
1233        &self,
1234        dbtx: &mut DatabaseTransaction<'_>,
1235        states: Vec<DynState>,
1236    ) -> AddStateMachinesResult {
1237        Self::add_state_machines_dbtx(self, dbtx, states).await
1238    }
1239}
1240
1241#[cfg(test)]
1242mod tests;