Skip to main content

fedimint_client/sm/
executor.rs

1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Write};
5use std::mem;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::anyhow;
10use fedimint_client_module::sm::executor::{
11    ActiveStateKey, ContextGen, IExecutor, InactiveStateKey,
12};
13use fedimint_client_module::sm::{
14    ActiveStateMeta, ClientSMDatabaseTransaction, DynContext, DynState, InactiveStateMeta, State,
15    StateTransition, StateTransitionFunction,
16};
17use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
18use fedimint_core::db::{
19    AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
20    IDatabaseTransactionOpsCoreTyped,
21};
22use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
23use fedimint_core::fmt_utils::AbbreviateJson;
24use fedimint_core::module::registry::ModuleDecoderRegistry;
25use fedimint_core::task::TaskGroup;
26use fedimint_core::util::{BoxFuture, FmtCompactAnyhow as _};
27use fedimint_core::{apply, async_trait_maybe_send};
28use fedimint_eventlog::{DBTransactionEventLogExt as _, Event, EventKind, EventPersistence};
29use fedimint_logging::LOG_CLIENT_REACTOR;
30use futures::future::{self, select_all};
31use futures::stream::{FuturesUnordered, StreamExt};
32use serde::{Deserialize, Serialize};
33use tokio::select;
34use tokio::sync::{mpsc, oneshot, watch};
35use tracing::{Instrument, debug, error, info, trace, warn};
36
37use crate::sm::notifier::Notifier;
38use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
39
40/// After how many attempts a DB transaction is aborted with an error
41const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
42
43/// Prefixes for executor DB entries
44pub(crate) enum ExecutorDbPrefixes {
45    /// See [`ActiveStateKey`]
46    ActiveStates = 0xa1,
47    /// See [`InactiveStateKey`]
48    InactiveStates = 0xa2,
49}
50
51#[derive(Serialize, Deserialize)]
52pub struct StateMachineUpdated {
53    operation_id: OperationId,
54    started: bool,
55    terminal: bool,
56    module_id: ModuleInstanceId,
57}
58
59impl Event for StateMachineUpdated {
60    const MODULE: Option<fedimint_core::core::ModuleKind> = None;
61    const KIND: EventKind = EventKind::from_static("sm-updated");
62    const PERSISTENCE: EventPersistence = EventPersistence::Trimable;
63}
64
65/// Executor that drives forward state machines under its management.
66///
67/// Each state transition is atomic and supposed to be idempotent such that a
68/// stop/crash of the executor at any point can be recovered from on restart.
69/// The executor is aware of the concept of Fedimint modules and can give state
70/// machines a different [execution context](crate::module::sm::Context)
71/// depending on the owning module, making it very flexible.
72#[derive(Clone, Debug)]
73pub struct Executor {
74    inner: Arc<ExecutorInner>,
75}
76
77struct ExecutorInner {
78    db: Database,
79    state: std::sync::RwLock<ExecutorState>,
80    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
81    valid_module_ids: BTreeSet<ModuleInstanceId>,
82    notifier: Notifier,
83    /// Any time executor should notice state machine update (e.g. because it
84    /// was created), it's must be sent through this channel for it to notice.
85    sm_update_tx: mpsc::UnboundedSender<DynState>,
86    client_task_group: TaskGroup,
87    log_ordering_wakeup_tx: watch::Sender<()>,
88}
89
90enum ExecutorState {
91    Unstarted {
92        sm_update_rx: mpsc::UnboundedReceiver<DynState>,
93    },
94    Running {
95        context_gen: ContextGen,
96        shutdown_sender: oneshot::Sender<()>,
97    },
98    Stopped,
99}
100
101impl ExecutorState {
102    /// Starts the executor, returning a receiver that will be signalled when
103    /// the executor is stopped and a receiver for state machine updates.
104    /// Returns `None` if the executor has already been started and/or stopped.
105    fn start(
106        &mut self,
107        context: ContextGen,
108    ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
109        let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
110
111        let previous_state = mem::replace(
112            self,
113            ExecutorState::Running {
114                context_gen: context,
115                shutdown_sender,
116            },
117        );
118
119        match previous_state {
120            ExecutorState::Unstarted { sm_update_rx } => Some((shutdown_receiver, sm_update_rx)),
121            _ => {
122                // Replace the previous state, undoing the `mem::replace` above.
123                *self = previous_state;
124
125                debug!(target: LOG_CLIENT_REACTOR, "Executor already started, ignoring start request");
126                None
127            }
128        }
129    }
130
131    /// Stops the executor, returning `Some(())` if the executor was running and
132    /// `None` if it was in any other state.
133    fn stop(&mut self) -> Option<()> {
134        let previous_state = mem::replace(self, ExecutorState::Stopped);
135
136        match previous_state {
137            ExecutorState::Running {
138                shutdown_sender, ..
139            } => {
140                if shutdown_sender.send(()).is_err() {
141                    warn!(target: LOG_CLIENT_REACTOR, "Failed to send shutdown signal to executor, already dead?");
142                }
143                Some(())
144            }
145            _ => {
146                // Replace the previous state, undoing the `mem::replace` above.
147                *self = previous_state;
148
149                debug!(target: LOG_CLIENT_REACTOR, "Executor not running, ignoring stop request");
150                None
151            }
152        }
153    }
154
155    fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
156        let ExecutorState::Running { context_gen, .. } = self else {
157            return None;
158        };
159        Some(context_gen(
160            state.module_instance_id(),
161            state.operation_id(),
162        ))
163    }
164}
165
166/// Builder to which module clients can be attached and used to build an
167/// [`Executor`] supporting these.
168#[derive(Debug, Default)]
169pub struct ExecutorBuilder {
170    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
171    valid_module_ids: BTreeSet<ModuleInstanceId>,
172}
173
174impl Executor {
175    /// Creates an [`ExecutorBuilder`]
176    pub fn builder() -> ExecutorBuilder {
177        ExecutorBuilder::default()
178    }
179
180    pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
181        self.inner.get_active_states().await
182    }
183
184    /// Adds a number of state machines to the executor atomically. They will be
185    /// driven to completion automatically in the background.
186    ///
187    /// **Attention**: do not use before background task is started!
188    // TODO: remove warning once finality is an inherent state attribute
189    pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
190        self.inner
191            .db
192            .autocommit(
193                |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
194                MAX_DB_ATTEMPTS,
195            )
196            .await
197            .map_err(|e| match e {
198                AutocommitError::CommitFailed {
199                    last_error,
200                    attempts,
201                } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
202                AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
203            })?;
204
205        // TODO: notify subscribers to state changes?
206
207        Ok(())
208    }
209
210    /// Adds a number of state machines to the executor atomically with other DB
211    /// changes is `dbtx`. See [`Executor::add_state_machines`] for more
212    /// details.
213    ///
214    /// ## Panics
215    /// If called before background task is started using
216    /// [`Executor::start_executor`]!
217    // TODO: remove warning once finality is an inherent state attribute
218    pub async fn add_state_machines_dbtx(
219        &self,
220        dbtx: &mut DatabaseTransaction<'_>,
221        states: Vec<DynState>,
222    ) -> AddStateMachinesResult {
223        for state in states {
224            if !self
225                .inner
226                .valid_module_ids
227                .contains(&state.module_instance_id())
228            {
229                return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
230            }
231
232            let is_active_state = dbtx
233                .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
234                .await
235                .is_some();
236            let is_inactive_state = dbtx
237                .get_value(&InactiveStateKeyDb(InactiveStateKey::from_state(
238                    state.clone(),
239                )))
240                .await
241                .is_some();
242
243            if is_active_state || is_inactive_state {
244                return Err(AddStateMachinesError::StateAlreadyExists);
245            }
246
247            // In case of recovery functions, the module itself is not yet initialized,
248            // so we can't check if the state is terminal. However the
249            // [`Self::get_transitions_for`] function will double check and
250            // deactivate any terminal states that would slip past this check.
251            if let Some(module_context) =
252                self.inner.module_contexts.get(&state.module_instance_id())
253            {
254                match self
255                    .inner
256                    .state
257                    .read()
258                    .expect("locking failed")
259                    .gen_context(&state)
260                {
261                    Some(context) => {
262                        if state.is_terminal(module_context, &context) {
263                            return Err(AddStateMachinesError::Other(anyhow!(
264                                "State is already terminal, adding it to the executor doesn't make sense."
265                            )));
266                        }
267                    }
268                    _ => {
269                        warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
270                    }
271                }
272            }
273
274            dbtx.insert_new_entry(
275                &ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())),
276                &ActiveStateMeta::default(),
277            )
278            .await;
279
280            let operation_id = state.operation_id();
281            self.inner
282                .log_event_dbtx(
283                    dbtx,
284                    StateMachineUpdated {
285                        operation_id,
286                        started: true,
287                        terminal: false,
288                        module_id: state.module_instance_id(),
289                    },
290                )
291                .await;
292
293            let notify_sender = self.inner.notifier.sender();
294            let sm_updates_tx = self.inner.sm_update_tx.clone();
295            dbtx.on_commit(move || {
296                notify_sender.notify(state.clone());
297                let _ = sm_updates_tx.send(state);
298            });
299        }
300
301        Ok(())
302    }
303
304    /// **Mostly used for testing**
305    ///
306    /// Check if state exists in the database as part of an actively running
307    /// state machine.
308    pub async fn contains_active_state<S: State>(
309        &self,
310        instance: ModuleInstanceId,
311        state: S,
312    ) -> bool {
313        let state = DynState::from_typed(instance, state);
314        self.inner
315            .get_active_states()
316            .await
317            .into_iter()
318            .any(|(s, _)| s == state)
319    }
320
321    // TODO: unify querying fns
322    /// **Mostly used for testing**
323    ///
324    /// Check if state exists in the database as inactive. If the state is
325    /// terminal it means the corresponding state machine finished its
326    /// execution. If the state is non-terminal it means the state machine was
327    /// in that state at some point but moved on since then.
328    pub async fn contains_inactive_state<S: State>(
329        &self,
330        instance: ModuleInstanceId,
331        state: S,
332    ) -> bool {
333        let state = DynState::from_typed(instance, state);
334        self.inner
335            .get_inactive_states()
336            .await
337            .into_iter()
338            .any(|(s, _)| s == state)
339    }
340
341    pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
342        self.inner
343            .db
344            .wait_key_exists(&InactiveStateKeyDb(InactiveStateKey::from_state(state)))
345            .await
346    }
347
348    pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
349        self.inner
350            .db
351            .wait_key_exists(&ActiveStateKeyDb(ActiveStateKey::from_state(state)))
352            .await
353    }
354
355    /// Only meant for debug tooling
356    pub async fn get_operation_states(
357        &self,
358        operation_id: OperationId,
359    ) -> (
360        Vec<(DynState, ActiveStateMeta)>,
361        Vec<(DynState, InactiveStateMeta)>,
362    ) {
363        let mut dbtx = self.inner.db.begin_transaction_nc().await;
364        let active_states: Vec<_> = dbtx
365            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
366            .await
367            .map(|(active_key, active_meta)| (active_key.0.state, active_meta))
368            .collect()
369            .await;
370        let inactive_states: Vec<_> = dbtx
371            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
372            .await
373            .map(|(active_key, inactive_meta)| (active_key.0.state, inactive_meta))
374            .collect()
375            .await;
376
377        (active_states, inactive_states)
378    }
379
380    /// Starts the background thread that runs the state machines. This cannot
381    /// be done when building the executor since some global contexts in turn
382    /// may depend on the executor, forming a cyclic dependency.
383    ///
384    /// ## Panics
385    /// If called more than once.
386    pub fn start_executor(&self, context_gen: ContextGen) {
387        let Some((shutdown_receiver, sm_update_rx)) = self
388            .inner
389            .state
390            .write()
391            .expect("locking can't fail")
392            .start(context_gen.clone())
393        else {
394            panic!("start_executor was called previously");
395        };
396
397        let task_runner_inner = self.inner.clone();
398        let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
399            let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
400            let task_group_shutdown_rx = task_handle.make_shutdown_rx();
401            select! {
402                () = task_group_shutdown_rx => {
403                    debug!(
404                        target: LOG_CLIENT_REACTOR,
405                        "Shutting down state machine executor runner due to task group shutdown signal"
406                    );
407                },
408                shutdown_happened_sender = shutdown_receiver => {
409                    match shutdown_happened_sender {
410                        Ok(()) => {
411                            debug!(
412                                target: LOG_CLIENT_REACTOR,
413                                "Shutting down state machine executor runner due to explicit shutdown signal"
414                            );
415                        },
416                        Err(_) => {
417                            warn!(
418                                target: LOG_CLIENT_REACTOR,
419                                "Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)"
420                            );
421                        }
422                    }
423                },
424                () = executor_runner => {
425                    error!(target: LOG_CLIENT_REACTOR, "State machine executor runner exited unexpectedly!");
426                },
427            };
428        });
429    }
430
431    /// Stops the background task that runs the state machines.
432    ///
433    /// If a shutdown signal was sent it returns a [`oneshot::Receiver`] that
434    /// will be signalled when the main loop of the background task has
435    /// exited. This can be useful to block until the executor has stopped
436    /// to avoid errors due to the async runtime shutting down while the
437    /// task is still running.
438    ///
439    /// If no shutdown signal was sent it returns `None`. This can happen if
440    /// `stop_executor` is called multiple times.
441    ///
442    /// ## Panics
443    /// If called in parallel with [`start_executor`](Self::start_executor).
444    pub fn stop_executor(&self) -> Option<()> {
445        self.inner.stop_executor()
446    }
447
448    /// Returns a reference to the [`Notifier`] that can be used to subscribe to
449    /// state transitions
450    pub fn notifier(&self) -> &Notifier {
451        &self.inner.notifier
452    }
453}
454
455impl Drop for ExecutorInner {
456    fn drop(&mut self) {
457        self.stop_executor();
458    }
459}
460
461struct TransitionForActiveState {
462    outcome: serde_json::Value,
463    state: DynState,
464    meta: ActiveStateMeta,
465    transition_fn: StateTransitionFunction<DynState>,
466}
467
468impl ExecutorInner {
469    async fn run(
470        &self,
471        global_context_gen: ContextGen,
472        sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
473    ) {
474        debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
475        if let Err(err) = self
476            .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
477            .await
478        {
479            warn!(
480                target: LOG_CLIENT_REACTOR,
481                err = %err.fmt_compact_anyhow(),
482                "An unexpected error occurred during a state transition"
483            );
484        }
485    }
486
487    async fn get_transition_for(
488        &self,
489        state: &DynState,
490        meta: ActiveStateMeta,
491        global_context_gen: &ContextGen,
492    ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
493        let module_instance = state.module_instance_id();
494        let context = &self
495            .module_contexts
496            .get(&module_instance)
497            .expect("Unknown module");
498        let transitions = state
499            .transitions(
500                context,
501                &global_context_gen(module_instance, state.operation_id()),
502            )
503            .into_iter()
504            .map(|transition| {
505                let state = state.clone();
506                let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
507                    let StateTransition {
508                        trigger,
509                        transition,
510                    } = transition;
511                    TransitionForActiveState {
512                        outcome: trigger.await,
513                        state,
514                        transition_fn: transition,
515                        meta,
516                    }
517                });
518                f
519            })
520            .collect::<Vec<_>>();
521        if transitions.is_empty() {
522            // In certain cases a terminal (no transitions) state could get here due to
523            // module bug. Inactivate it to prevent accumulation of such states.
524            // See [`Self::add_state_machines_dbtx`].
525            warn!(
526                target: LOG_CLIENT_REACTOR,
527                module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream."
528            );
529            self.db
530                .autocommit::<_, _, anyhow::Error>(
531                    |dbtx, _| {
532                        Box::pin(async {
533                            let k = InactiveStateKey::from_state(state.clone());
534                            let v = ActiveStateMeta::default().into_inactive();
535                            dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
536                                state.clone(),
537                            )))
538                            .await;
539                            dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
540                            Ok(())
541                        })
542                    },
543                    None,
544                )
545                .await
546                .expect("Autocommit here can't fail");
547        }
548
549        transitions
550    }
551
552    async fn run_state_machines_executor_inner(
553        &self,
554        global_context_gen: ContextGen,
555        mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
556    ) -> anyhow::Result<()> {
557        /// All futures in the executor resolve to this type, so the handling
558        /// code can tell them apart.
559        enum ExecutorLoopEvent {
560            /// Notification about `DynState` arrived and should be handled,
561            /// usually added to the list of pending futures.
562            New { state: DynState },
563            /// One of trigger futures of a state machine finished and
564            /// returned transition function to run
565            Triggered(TransitionForActiveState),
566            /// The state machine did not need to run, so it was canceled
567            Invalid { state: DynState },
568            /// Transition function and all the accounting around it are done
569            Completed {
570                state: DynState,
571                outcome: ActiveOrInactiveState,
572            },
573            /// New job receiver disconnected, that can only mean termination
574            Disconnected,
575        }
576
577        let active_states = self.get_active_states().await;
578        trace!(target: LOG_CLIENT_REACTOR, "Starting active states: {:?}", active_states);
579        debug!(
580            target: LOG_CLIENT_REACTOR,
581            total = active_states.len(),
582            "Starting active state machines",
583        );
584        for (state, meta) in active_states {
585            let age = fedimint_core::time::now()
586                .duration_since(meta.created_at)
587                .unwrap_or_default();
588            if age > Duration::from_secs(7 * 24 * 3600) {
589                warn!(
590                    target: LOG_CLIENT_REACTOR,
591                    operation_id = %state.operation_id().fmt_short(),
592                    module_instance = %state.module_instance_id(),
593                    age_days = age.as_secs() / 86400,
594                    "Active state machine has been running for over a week, possibly stuck",
595                );
596            }
597            self.sm_update_tx
598                .send(state)
599                .expect("Must be able to send state machine to own opened channel");
600        }
601
602        // Keeps track of things already running, so we can deduplicate, just
603        // in case.
604        let mut currently_running_sms = HashSet::<DynState>::new();
605        // All things happening in parallel go into here
606        // NOTE: `FuturesUnordered` is a footgun: when it's not being polled
607        // (e.g. we picked an event and are awaiting on something to process it),
608        // nothing inside `futures` will be making progress, which in extreme cases
609        // could lead to hangs. For this reason we try really hard in the code here,
610        // to pick an event from `futures` and spawn a new task, avoiding any `await`,
611        // just so we can get back to `futures.next()` ASAP.
612        let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
613            FuturesUnordered::new();
614
615        loop {
616            let event = tokio::select! {
617                new = sm_update_rx.recv() => {
618                    match new { Some(new) => {
619                        ExecutorLoopEvent::New {
620                            state: new,
621                        }
622                    } _ => {
623                        ExecutorLoopEvent::Disconnected
624                    }}
625                },
626
627                event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
628            };
629
630            // main reactor loop: wait for next thing that completed, react (possibly adding
631            // more things to `futures`)
632            match event {
633                ExecutorLoopEvent::New { state } => {
634                    if currently_running_sms.contains(&state) {
635                        warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
636                        continue;
637                    }
638                    currently_running_sms.insert(state.clone());
639                    let futures_len = futures.len();
640                    let global_context_gen = &global_context_gen;
641                    trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
642                    futures.push(Box::pin(async move {
643                        let Some(meta) = self.get_active_state(&state).await else {
644                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
645                            return ExecutorLoopEvent::Invalid { state: state.clone() };
646                        };
647
648                        let transitions = self
649                            .get_transition_for(&state, meta, global_context_gen)
650                            .await;
651                        if transitions.is_empty() {
652                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
653                            return ExecutorLoopEvent::Invalid { state: state.clone() };
654                        }
655                        let transitions_num = transitions.len();
656
657                        debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
658
659                        let (first_completed_result, _index, _unused_transitions) =
660                            select_all(transitions).await;
661                        ExecutorLoopEvent::Triggered(first_completed_result)
662                    }));
663                }
664                ExecutorLoopEvent::Triggered(TransitionForActiveState {
665                    outcome,
666                    state,
667                    meta,
668                    transition_fn,
669                }) => {
670                    debug!(
671                        target: LOG_CLIENT_REACTOR,
672                        operation_id = %state.operation_id().fmt_short(),
673                        "Triggered state transition",
674                    );
675                    let span = tracing::debug_span!(
676                        target: LOG_CLIENT_REACTOR,
677                        "sm_transition",
678                        operation_id = %state.operation_id().fmt_short()
679                    );
680                    // Perform the transition as another future, so transitions can happen in
681                    // parallel.
682                    // Database write conflicts might be happening quite often here,
683                    // but transaction functions are supposed to be idempotent anyway,
684                    // so it seems like a good stress-test in the worst case.
685                    futures.push({
686                        let sm_update_tx = self.sm_update_tx.clone();
687                        let db = self.db.clone();
688                        let notifier = self.notifier.clone();
689                        let module_contexts = self.module_contexts.clone();
690                        let global_context_gen = global_context_gen.clone();
691                        Box::pin(
692                            async move {
693                                debug!(
694                                    target: LOG_CLIENT_REACTOR,
695                                    "Executing state transition",
696                                );
697                                trace!(
698                                    target: LOG_CLIENT_REACTOR,
699                                    ?state,
700                                    outcome = ?AbbreviateJson(&outcome),
701                                    "Executing state transition (details)",
702                                );
703
704                                let module_contexts = &module_contexts;
705                                let global_context_gen = &global_context_gen;
706
707                                let outcome = db
708                                    .autocommit::<'_, '_, _, _, Infallible>(
709                                        |dbtx, _| {
710                                            let state = state.clone();
711                                            let state_module_instance_id = state.module_instance_id();
712                                            let transition_fn = transition_fn.clone();
713                                            let transition_outcome = outcome.clone();
714                                            Box::pin(async move {
715                                                let new_state = transition_fn(
716                                                    &mut ClientSMDatabaseTransaction::new(
717                                                        &mut dbtx.to_ref(),
718                                                        state.module_instance_id(),
719                                                    ),
720                                                    transition_outcome.clone(),
721                                                    state.clone(),
722                                                )
723                                                .await;
724                                                dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
725                                                    state.clone(),
726                                                )))
727                                                .await;
728                                                dbtx.insert_entry(
729                                                    &InactiveStateKeyDb(InactiveStateKey::from_state(state.clone())),
730                                                    &meta.into_inactive(),
731                                                )
732                                                .await;
733
734                                                let context = &module_contexts
735                                                    .get(&state.module_instance_id())
736                                                    .expect("Unknown module");
737
738                                                let operation_id = state.operation_id();
739                                                let global_context = global_context_gen(
740                                                    state.module_instance_id(),
741                                                    operation_id,
742                                                );
743
744                                                let is_terminal = new_state.is_terminal(context, &global_context);
745
746                                                self.log_event_dbtx(dbtx,
747                                                    StateMachineUpdated{
748                                                        started: false,
749                                                        operation_id,
750                                                        module_id: state_module_instance_id,
751                                                        terminal: is_terminal,
752                                                    }
753                                                ).await;
754
755                                                if is_terminal {
756                                                    let k = InactiveStateKey::from_state(
757                                                        new_state.clone(),
758                                                    );
759                                                    let v = ActiveStateMeta::default().into_inactive();
760                                                    dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
761                                                    Ok(ActiveOrInactiveState::Inactive {
762                                                        dyn_state: new_state,
763                                                    })
764                                                } else {
765                                                    let k = ActiveStateKey::from_state(
766                                                        new_state.clone(),
767                                                    );
768                                                    let v = ActiveStateMeta::default();
769                                                    dbtx.insert_entry(&ActiveStateKeyDb(k), &v).await;
770                                                    Ok(ActiveOrInactiveState::Active {
771                                                        dyn_state: new_state,
772                                                        meta: v,
773                                                    })
774                                                }
775                                            })
776                                        },
777                                        None,
778                                    )
779                                    .await
780                                    .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
781
782                                debug!(
783                                    target: LOG_CLIENT_REACTOR,
784                                    terminal = !outcome.is_active(),
785                                    ?outcome,
786                                    "State transition complete",
787                                );
788
789                                match &outcome {
790                                    ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
791                                        sm_update_tx
792                                            .send(dyn_state.clone())
793                                            .expect("can't fail: we are the receiving end");
794                                        notifier.notify(dyn_state.clone());
795                                    }
796                                    ActiveOrInactiveState::Inactive { dyn_state } => {
797                                        notifier.notify(dyn_state.clone());
798                                    }
799                                }
800                                ExecutorLoopEvent::Completed { state, outcome }
801                            }
802                            .instrument(span),
803                        )
804                    });
805                }
806                ExecutorLoopEvent::Invalid { state } => {
807                    trace!(
808                        target: LOG_CLIENT_REACTOR,
809                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
810                        "State invalid"
811                    );
812                    assert!(
813                        currently_running_sms.remove(&state),
814                        "State must have been recorded"
815                    );
816                }
817
818                ExecutorLoopEvent::Completed { state, outcome } => {
819                    assert!(
820                        currently_running_sms.remove(&state),
821                        "State must have been recorded"
822                    );
823                    debug!(
824                        target: LOG_CLIENT_REACTOR,
825                        operation_id = %state.operation_id().fmt_short(),
826                        outcome_active = outcome.is_active(),
827                        total = futures.len(),
828                        "State transition complete"
829                    );
830                    trace!(
831                        target: LOG_CLIENT_REACTOR,
832                        ?outcome,
833                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
834                        "State transition complete"
835                    );
836                }
837                ExecutorLoopEvent::Disconnected => {
838                    break;
839                }
840            }
841        }
842
843        info!(target: LOG_CLIENT_REACTOR, "Terminated.");
844        Ok(())
845    }
846
847    async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
848        self.db
849            .begin_transaction_nc()
850            .await
851            .find_by_prefix(&ActiveStateKeyPrefix)
852            .await
853            // ignore states from modules that are not initialized yet
854            .filter(|(state, _)| {
855                future::ready(
856                    self.module_contexts
857                        .contains_key(&state.0.state.module_instance_id()),
858                )
859            })
860            .map(|(state, meta)| (state.0.state, meta))
861            .collect::<Vec<_>>()
862            .await
863    }
864
865    async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
866        // ignore states from modules that are not initialized yet
867        if !self
868            .module_contexts
869            .contains_key(&state.module_instance_id())
870        {
871            return None;
872        }
873        self.db
874            .begin_transaction_nc()
875            .await
876            .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
877            .await
878    }
879
880    async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
881        self.db
882            .begin_transaction_nc()
883            .await
884            .find_by_prefix(&InactiveStateKeyPrefix)
885            .await
886            // ignore states from modules that are not initialized yet
887            .filter(|(state, _)| {
888                future::ready(
889                    self.module_contexts
890                        .contains_key(&state.0.state.module_instance_id()),
891                )
892            })
893            .map(|(state, meta)| (state.0.state, meta))
894            .collect::<Vec<_>>()
895            .await
896    }
897
898    pub async fn log_event_dbtx<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
899    where
900        E: Event + Send,
901        Cap: Send,
902    {
903        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), None, event)
904            .await;
905    }
906}
907
908impl ExecutorInner {
909    /// See [`Executor::stop_executor`].
910    fn stop_executor(&self) -> Option<()> {
911        let mut state = self.state.write().expect("Locking can't fail");
912
913        state.stop()
914    }
915}
916
917impl Debug for ExecutorInner {
918    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
919        writeln!(f, "ExecutorInner {{}}")
920    }
921}
922
923impl ExecutorBuilder {
924    /// Allow executor being built to run state machines associated with the
925    /// supplied module
926    pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
927    where
928        C: IntoDynInstance<DynType = DynContext>,
929    {
930        self.with_module_dyn(context.into_dyn(instance_id));
931    }
932
933    /// Allow executor being built to run state machines associated with the
934    /// supplied module
935    pub fn with_module_dyn(&mut self, context: DynContext) {
936        self.valid_module_ids.insert(context.module_instance_id());
937
938        if self
939            .module_contexts
940            .insert(context.module_instance_id(), context)
941            .is_some()
942        {
943            panic!("Tried to add two modules with the same instance id!");
944        }
945    }
946
947    /// Allow executor to build state machines associated with the module id,
948    /// for which the module itself might not be available yet (otherwise it
949    /// would be registered with `[Self::with_module_dyn]`).
950    pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
951        self.valid_module_ids.insert(module_id);
952    }
953
954    /// Build [`Executor`] and spawn background task in `tasks` executing active
955    /// state machines. The supplied database `db` must support isolation, so
956    /// cannot be an isolated DB instance itself.
957    pub fn build(
958        self,
959        db: Database,
960        notifier: Notifier,
961        client_task_group: TaskGroup,
962        log_ordering_wakeup_tx: watch::Sender<()>,
963    ) -> Executor {
964        let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
965
966        let inner = Arc::new(ExecutorInner {
967            db,
968            log_ordering_wakeup_tx,
969            state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
970            module_contexts: self.module_contexts,
971            valid_module_ids: self.valid_module_ids,
972            notifier,
973            sm_update_tx,
974            client_task_group,
975        });
976
977        debug!(
978            target: LOG_CLIENT_REACTOR,
979            instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
980            "Initialized state machine executor with module instances"
981        );
982        Executor { inner }
983    }
984}
985#[derive(Debug)]
986pub struct ActiveOperationStateKeyPrefix {
987    pub operation_id: OperationId,
988}
989
990impl Encodable for ActiveOperationStateKeyPrefix {
991    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
992        self.operation_id.consensus_encode(writer)
993    }
994}
995
996impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
997    type Record = ActiveStateKeyDb;
998}
999
1000#[derive(Debug)]
1001pub(crate) struct ActiveModuleOperationStateKeyPrefix {
1002    pub operation_id: OperationId,
1003    pub module_instance: ModuleInstanceId,
1004}
1005
1006impl Encodable for ActiveModuleOperationStateKeyPrefix {
1007    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1008        self.operation_id.consensus_encode(writer)?;
1009        self.module_instance.consensus_encode(writer)?;
1010        Ok(())
1011    }
1012}
1013
1014impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
1015    type Record = ActiveStateKeyDb;
1016}
1017
1018#[derive(Debug)]
1019pub struct ActiveStateKeyPrefix;
1020
1021impl Encodable for ActiveStateKeyPrefix {
1022    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1023        Ok(())
1024    }
1025}
1026
1027#[derive(Encodable, Decodable, Debug)]
1028pub struct ActiveStateKeyDb(pub fedimint_client_module::sm::executor::ActiveStateKey);
1029
1030impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyDb {
1031    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1032    const NOTIFY_ON_MODIFY: bool = true;
1033    type Key = Self;
1034    type Value = ActiveStateMeta;
1035}
1036
1037impl DatabaseKeyWithNotify for ActiveStateKeyDb {}
1038
1039impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
1040    type Record = ActiveStateKeyDb;
1041}
1042
1043#[derive(Debug, Encodable, Decodable)]
1044pub struct ActiveStateKeyPrefixBytes;
1045
1046impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
1047    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1048    const NOTIFY_ON_MODIFY: bool = false;
1049    type Key = Self;
1050    type Value = ActiveStateMeta;
1051}
1052
1053impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
1054    type Record = ActiveStateKeyBytes;
1055}
1056
1057#[derive(Encodable, Decodable, Debug)]
1058pub struct InactiveStateKeyDb(pub fedimint_client_module::sm::executor::InactiveStateKey);
1059
1060#[derive(Debug)]
1061pub struct InactiveStateKeyBytes {
1062    pub operation_id: OperationId,
1063    pub module_instance_id: ModuleInstanceId,
1064    pub state: Vec<u8>,
1065}
1066
1067impl Encodable for InactiveStateKeyBytes {
1068    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1069        self.operation_id.consensus_encode(writer)?;
1070        writer.write_all(self.state.as_slice())?;
1071        Ok(())
1072    }
1073}
1074
1075impl Decodable for InactiveStateKeyBytes {
1076    fn consensus_decode_partial<R: std::io::Read>(
1077        reader: &mut R,
1078        modules: &ModuleDecoderRegistry,
1079    ) -> Result<Self, DecodeError> {
1080        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1081        let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1082        let mut bytes = Vec::new();
1083        reader
1084            .read_to_end(&mut bytes)
1085            .map_err(DecodeError::from_err)?;
1086
1087        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1088        instance_bytes.append(&mut bytes);
1089
1090        Ok(InactiveStateKeyBytes {
1091            operation_id,
1092            module_instance_id,
1093            state: instance_bytes,
1094        })
1095    }
1096}
1097
1098#[derive(Debug)]
1099pub struct InactiveOperationStateKeyPrefix {
1100    pub operation_id: OperationId,
1101}
1102
1103impl Encodable for InactiveOperationStateKeyPrefix {
1104    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1105        self.operation_id.consensus_encode(writer)
1106    }
1107}
1108
1109impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1110    type Record = InactiveStateKeyDb;
1111}
1112
1113#[derive(Debug)]
1114pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1115    pub operation_id: OperationId,
1116    pub module_instance: ModuleInstanceId,
1117}
1118
1119impl Encodable for InactiveModuleOperationStateKeyPrefix {
1120    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1121        self.operation_id.consensus_encode(writer)?;
1122        self.module_instance.consensus_encode(writer)?;
1123        Ok(())
1124    }
1125}
1126
1127impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1128    type Record = InactiveStateKeyDb;
1129}
1130
1131#[derive(Debug, Clone)]
1132pub struct InactiveStateKeyPrefix;
1133
1134impl Encodable for InactiveStateKeyPrefix {
1135    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1136        Ok(())
1137    }
1138}
1139
1140#[derive(Debug, Encodable, Decodable)]
1141pub struct InactiveStateKeyPrefixBytes;
1142
1143impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1144    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1145    const NOTIFY_ON_MODIFY: bool = false;
1146    type Key = Self;
1147    type Value = InactiveStateMeta;
1148}
1149
1150impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1151    type Record = InactiveStateKeyBytes;
1152}
1153
1154#[derive(Debug)]
1155pub struct ActiveStateKeyBytes {
1156    pub operation_id: OperationId,
1157    pub module_instance_id: ModuleInstanceId,
1158    pub state: Vec<u8>,
1159}
1160
1161impl Encodable for ActiveStateKeyBytes {
1162    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1163        self.operation_id.consensus_encode(writer)?;
1164        writer.write_all(self.state.as_slice())?;
1165        Ok(())
1166    }
1167}
1168
1169impl Decodable for ActiveStateKeyBytes {
1170    fn consensus_decode_partial<R: std::io::Read>(
1171        reader: &mut R,
1172        modules: &ModuleDecoderRegistry,
1173    ) -> Result<Self, DecodeError> {
1174        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1175        let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1176        let mut bytes = Vec::new();
1177        reader
1178            .read_to_end(&mut bytes)
1179            .map_err(DecodeError::from_err)?;
1180
1181        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1182        instance_bytes.append(&mut bytes);
1183
1184        Ok(ActiveStateKeyBytes {
1185            operation_id,
1186            module_instance_id,
1187            state: instance_bytes,
1188        })
1189    }
1190}
1191impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyDb {
1192    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1193    const NOTIFY_ON_MODIFY: bool = true;
1194    type Key = Self;
1195    type Value = InactiveStateMeta;
1196}
1197
1198impl DatabaseKeyWithNotify for InactiveStateKeyDb {}
1199
1200impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1201    type Record = InactiveStateKeyDb;
1202}
1203
1204#[derive(Debug)]
1205enum ActiveOrInactiveState {
1206    Active {
1207        dyn_state: DynState,
1208        #[allow(dead_code)] // currently not printed anywhere, but useful in the db
1209        meta: ActiveStateMeta,
1210    },
1211    Inactive {
1212        dyn_state: DynState,
1213    },
1214}
1215
1216impl ActiveOrInactiveState {
1217    fn is_active(&self) -> bool {
1218        match self {
1219            ActiveOrInactiveState::Active { .. } => true,
1220            ActiveOrInactiveState::Inactive { .. } => false,
1221        }
1222    }
1223}
1224
1225#[apply(async_trait_maybe_send!)]
1226impl IExecutor for Executor {
1227    async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
1228        Self::get_active_states(self).await
1229    }
1230
1231    async fn add_state_machines_dbtx(
1232        &self,
1233        dbtx: &mut DatabaseTransaction<'_>,
1234        states: Vec<DynState>,
1235    ) -> AddStateMachinesResult {
1236        Self::add_state_machines_dbtx(self, dbtx, states).await
1237    }
1238}
1239
1240#[cfg(test)]
1241mod tests;