Skip to main content

fedimint_client/sm/
executor.rs

1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Write};
5use std::mem;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::anyhow;
10use fedimint_client_module::sm::executor::{
11    ActiveStateKey, ContextGen, IExecutor, InactiveStateKey,
12};
13use fedimint_client_module::sm::{
14    ActiveStateMeta, ClientSMDatabaseTransaction, DynContext, DynState, InactiveStateMeta, State,
15    StateTransition, StateTransitionFunction,
16};
17use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
18use fedimint_core::db::{
19    AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
20    IDatabaseTransactionOpsCoreTyped,
21};
22use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
23use fedimint_core::fmt_utils::AbbreviateJson;
24use fedimint_core::module::registry::ModuleDecoderRegistry;
25use fedimint_core::task::TaskGroup;
26use fedimint_core::util::{BoxFuture, FmtCompactAnyhow as _};
27use fedimint_core::{apply, async_trait_maybe_send};
28use fedimint_eventlog::{DBTransactionEventLogExt as _, Event, EventKind, EventPersistence};
29use fedimint_logging::LOG_CLIENT_REACTOR;
30use futures::future::{self, select_all};
31use futures::stream::{FuturesUnordered, StreamExt};
32use serde::{Deserialize, Serialize};
33use tokio::select;
34use tokio::sync::{mpsc, oneshot, watch};
35use tracing::{Instrument, debug, error, info, trace, warn};
36
37use crate::sm::notifier::Notifier;
38use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
39
40/// After how many attempts a DB transaction is aborted with an error
41const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
42
43/// Prefixes for executor DB entries
44pub(crate) enum ExecutorDbPrefixes {
45    /// See [`ActiveStateKey`]
46    ActiveStates = 0xa1,
47    /// See [`InactiveStateKey`]
48    InactiveStates = 0xa2,
49}
50
51#[derive(Serialize, Deserialize)]
52pub struct StateMachineUpdated {
53    operation_id: OperationId,
54    started: bool,
55    terminal: bool,
56    module_id: ModuleInstanceId,
57}
58
59impl Event for StateMachineUpdated {
60    const MODULE: Option<fedimint_core::core::ModuleKind> = None;
61    const KIND: EventKind = EventKind::from_static("sm-updated");
62    const PERSISTENCE: EventPersistence = EventPersistence::Trimable;
63}
64
65/// Executor that drives forward state machines under its management.
66///
67/// Each state transition is atomic and supposed to be idempotent such that a
68/// stop/crash of the executor at any point can be recovered from on restart.
69/// The executor is aware of the concept of Fedimint modules and can give state
70/// machines a different [execution context](crate::module::sm::Context)
71/// depending on the owning module, making it very flexible.
72#[derive(Clone, Debug)]
73pub struct Executor {
74    inner: Arc<ExecutorInner>,
75}
76
77struct ExecutorInner {
78    db: Database,
79    state: std::sync::RwLock<ExecutorState>,
80    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
81    valid_module_ids: BTreeSet<ModuleInstanceId>,
82    notifier: Notifier,
83    /// Any time executor should notice state machine update (e.g. because it
84    /// was created), it's must be sent through this channel for it to notice.
85    sm_update_tx: mpsc::UnboundedSender<DynState>,
86    client_task_group: TaskGroup,
87    log_ordering_wakeup_tx: watch::Sender<()>,
88}
89
90enum ExecutorState {
91    Unstarted {
92        sm_update_rx: mpsc::UnboundedReceiver<DynState>,
93    },
94    Running {
95        context_gen: ContextGen,
96        shutdown_sender: oneshot::Sender<()>,
97    },
98    Stopped,
99}
100
101impl ExecutorState {
102    /// Starts the executor, returning a receiver that will be signalled when
103    /// the executor is stopped and a receiver for state machine updates.
104    /// Returns `None` if the executor has already been started and/or stopped.
105    fn start(
106        &mut self,
107        context: ContextGen,
108    ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
109        let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
110
111        let previous_state = mem::replace(
112            self,
113            ExecutorState::Running {
114                context_gen: context,
115                shutdown_sender,
116            },
117        );
118
119        match previous_state {
120            ExecutorState::Unstarted { sm_update_rx } => Some((shutdown_receiver, sm_update_rx)),
121            _ => {
122                // Replace the previous state, undoing the `mem::replace` above.
123                *self = previous_state;
124
125                debug!(target: LOG_CLIENT_REACTOR, "Executor already started, ignoring start request");
126                None
127            }
128        }
129    }
130
131    /// Stops the executor, returning `Some(())` if the executor was running and
132    /// `None` if it was in any other state.
133    fn stop(&mut self) -> Option<()> {
134        let previous_state = mem::replace(self, ExecutorState::Stopped);
135
136        match previous_state {
137            ExecutorState::Running {
138                shutdown_sender, ..
139            } => {
140                if shutdown_sender.send(()).is_err() {
141                    warn!(target: LOG_CLIENT_REACTOR, "Failed to send shutdown signal to executor, already dead?");
142                }
143                Some(())
144            }
145            _ => {
146                // Replace the previous state, undoing the `mem::replace` above.
147                *self = previous_state;
148
149                debug!(target: LOG_CLIENT_REACTOR, "Executor not running, ignoring stop request");
150                None
151            }
152        }
153    }
154
155    fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
156        let ExecutorState::Running { context_gen, .. } = self else {
157            return None;
158        };
159        Some(context_gen(
160            state.module_instance_id(),
161            state.operation_id(),
162        ))
163    }
164}
165
166/// Builder to which module clients can be attached and used to build an
167/// [`Executor`] supporting these.
168#[derive(Debug, Default)]
169pub struct ExecutorBuilder {
170    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
171    valid_module_ids: BTreeSet<ModuleInstanceId>,
172}
173
174impl Executor {
175    /// Creates an [`ExecutorBuilder`]
176    pub fn builder() -> ExecutorBuilder {
177        ExecutorBuilder::default()
178    }
179
180    pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
181        self.inner.get_active_states().await
182    }
183
184    /// Adds a number of state machines to the executor atomically. They will be
185    /// driven to completion automatically in the background.
186    ///
187    /// **Attention**: do not use before background task is started!
188    // TODO: remove warning once finality is an inherent state attribute
189    pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
190        self.inner
191            .db
192            .autocommit(
193                |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
194                MAX_DB_ATTEMPTS,
195            )
196            .await
197            .map_err(|e| match e {
198                AutocommitError::CommitFailed {
199                    last_error,
200                    attempts,
201                } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
202                AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
203            })?;
204
205        // TODO: notify subscribers to state changes?
206
207        Ok(())
208    }
209
210    /// Adds a number of state machines to the executor atomically with other DB
211    /// changes is `dbtx`. See [`Executor::add_state_machines`] for more
212    /// details.
213    ///
214    /// ## Panics
215    /// If called before background task is started using
216    /// [`Executor::start_executor`]!
217    // TODO: remove warning once finality is an inherent state attribute
218    pub async fn add_state_machines_dbtx(
219        &self,
220        dbtx: &mut DatabaseTransaction<'_>,
221        states: Vec<DynState>,
222    ) -> AddStateMachinesResult {
223        for state in states {
224            if !self
225                .inner
226                .valid_module_ids
227                .contains(&state.module_instance_id())
228            {
229                return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
230            }
231
232            let is_active_state = dbtx
233                .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
234                .await
235                .is_some();
236            let is_inactive_state = dbtx
237                .get_value(&InactiveStateKeyDb(InactiveStateKey::from_state(
238                    state.clone(),
239                )))
240                .await
241                .is_some();
242
243            if is_active_state || is_inactive_state {
244                return Err(AddStateMachinesError::StateAlreadyExists);
245            }
246
247            // In case of recovery functions, the module itself is not yet initialized,
248            // so we can't check if the state is terminal. However the
249            // [`Self::get_transitions_for`] function will double check and
250            // deactivate any terminal states that would slip past this check.
251            if let Some(module_context) =
252                self.inner.module_contexts.get(&state.module_instance_id())
253            {
254                match self
255                    .inner
256                    .state
257                    .read()
258                    .expect("locking failed")
259                    .gen_context(&state)
260                {
261                    Some(context) => {
262                        if state.is_terminal(module_context, &context) {
263                            return Err(AddStateMachinesError::Other(anyhow!(
264                                "State is already terminal, adding it to the executor doesn't make sense."
265                            )));
266                        }
267                    }
268                    _ => {
269                        warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
270                    }
271                }
272            }
273
274            dbtx.insert_new_entry(
275                &ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())),
276                &ActiveStateMeta::default(),
277            )
278            .await;
279
280            let operation_id = state.operation_id();
281            self.inner
282                .log_event_dbtx(
283                    dbtx,
284                    StateMachineUpdated {
285                        operation_id,
286                        started: true,
287                        terminal: false,
288                        module_id: state.module_instance_id(),
289                    },
290                )
291                .await;
292
293            let notify_sender = self.inner.notifier.sender();
294            let sm_updates_tx = self.inner.sm_update_tx.clone();
295            dbtx.on_commit(move || {
296                notify_sender.notify(state.clone());
297                let _ = sm_updates_tx.send(state);
298            });
299        }
300
301        Ok(())
302    }
303
304    /// **Mostly used for testing**
305    ///
306    /// Check if state exists in the database as part of an actively running
307    /// state machine.
308    pub async fn contains_active_state<S: State>(
309        &self,
310        instance: ModuleInstanceId,
311        state: S,
312    ) -> bool {
313        let state = DynState::from_typed(instance, state);
314        self.inner
315            .get_active_states()
316            .await
317            .into_iter()
318            .any(|(s, _)| s == state)
319    }
320
321    // TODO: unify querying fns
322    /// **Mostly used for testing**
323    ///
324    /// Check if state exists in the database as inactive. If the state is
325    /// terminal it means the corresponding state machine finished its
326    /// execution. If the state is non-terminal it means the state machine was
327    /// in that state at some point but moved on since then.
328    pub async fn contains_inactive_state<S: State>(
329        &self,
330        instance: ModuleInstanceId,
331        state: S,
332    ) -> bool {
333        let state = DynState::from_typed(instance, state);
334        self.inner
335            .get_inactive_states()
336            .await
337            .into_iter()
338            .any(|(s, _)| s == state)
339    }
340
341    pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
342        self.inner
343            .db
344            .wait_key_exists(&InactiveStateKeyDb(InactiveStateKey::from_state(state)))
345            .await
346    }
347
348    pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
349        self.inner
350            .db
351            .wait_key_exists(&ActiveStateKeyDb(ActiveStateKey::from_state(state)))
352            .await
353    }
354
355    /// Only meant for debug tooling
356    pub async fn get_operation_states(
357        &self,
358        operation_id: OperationId,
359    ) -> (
360        Vec<(DynState, ActiveStateMeta)>,
361        Vec<(DynState, InactiveStateMeta)>,
362    ) {
363        let mut dbtx = self.inner.db.begin_transaction_nc().await;
364        let active_states: Vec<_> = dbtx
365            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
366            .await
367            .map(|(active_key, active_meta)| (active_key.0.state, active_meta))
368            .collect()
369            .await;
370        let inactive_states: Vec<_> = dbtx
371            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
372            .await
373            .map(|(active_key, inactive_meta)| (active_key.0.state, inactive_meta))
374            .collect()
375            .await;
376
377        (active_states, inactive_states)
378    }
379
380    /// Starts the background thread that runs the state machines. This cannot
381    /// be done when building the executor since some global contexts in turn
382    /// may depend on the executor, forming a cyclic dependency.
383    ///
384    /// ## Panics
385    /// If called more than once.
386    pub fn start_executor(&self, context_gen: ContextGen, client_span: tracing::Span) {
387        let Some((shutdown_receiver, sm_update_rx)) = self
388            .inner
389            .state
390            .write()
391            .expect("locking can't fail")
392            .start(context_gen.clone())
393        else {
394            panic!("start_executor was called previously");
395        };
396
397        let task_runner_inner = self.inner.clone();
398        let _handle = self.inner.client_task_group.spawn_with_span(
399            client_span,
400            "sm-executor",
401            |task_handle| async move {
402                let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
403                let task_group_shutdown_rx = task_handle.make_shutdown_rx();
404                select! {
405                    () = task_group_shutdown_rx => {
406                        debug!(
407                            target: LOG_CLIENT_REACTOR,
408                            "Shutting down state machine executor runner due to task group shutdown signal"
409                        );
410                    },
411                    shutdown_happened_sender = shutdown_receiver => {
412                        match shutdown_happened_sender {
413                            Ok(()) => {
414                                debug!(
415                                    target: LOG_CLIENT_REACTOR,
416                                    "Shutting down state machine executor runner due to explicit shutdown signal"
417                                );
418                            },
419                            Err(_) => {
420                                warn!(
421                                    target: LOG_CLIENT_REACTOR,
422                                    "Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)"
423                                );
424                            }
425                        }
426                    },
427                    () = executor_runner => {
428                        error!(target: LOG_CLIENT_REACTOR, "State machine executor runner exited unexpectedly!");
429                    },
430                };
431            },
432        );
433    }
434
435    /// Stops the background task that runs the state machines.
436    ///
437    /// If a shutdown signal was sent it returns a [`oneshot::Receiver`] that
438    /// will be signalled when the main loop of the background task has
439    /// exited. This can be useful to block until the executor has stopped
440    /// to avoid errors due to the async runtime shutting down while the
441    /// task is still running.
442    ///
443    /// If no shutdown signal was sent it returns `None`. This can happen if
444    /// `stop_executor` is called multiple times.
445    ///
446    /// ## Panics
447    /// If called in parallel with [`start_executor`](Self::start_executor).
448    pub fn stop_executor(&self) -> Option<()> {
449        self.inner.stop_executor()
450    }
451
452    /// Returns a reference to the [`Notifier`] that can be used to subscribe to
453    /// state transitions
454    pub fn notifier(&self) -> &Notifier {
455        &self.inner.notifier
456    }
457}
458
459impl Drop for ExecutorInner {
460    fn drop(&mut self) {
461        self.stop_executor();
462    }
463}
464
465struct TransitionForActiveState {
466    outcome: serde_json::Value,
467    state: DynState,
468    meta: ActiveStateMeta,
469    transition_fn: StateTransitionFunction<DynState>,
470}
471
472impl ExecutorInner {
473    async fn run(
474        &self,
475        global_context_gen: ContextGen,
476        sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
477    ) {
478        debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
479        if let Err(err) = self
480            .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
481            .await
482        {
483            warn!(
484                target: LOG_CLIENT_REACTOR,
485                err = %err.fmt_compact_anyhow(),
486                "An unexpected error occurred during a state transition"
487            );
488        }
489    }
490
491    async fn get_transition_for(
492        &self,
493        state: &DynState,
494        meta: ActiveStateMeta,
495        global_context_gen: &ContextGen,
496    ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
497        let module_instance = state.module_instance_id();
498        let context = &self
499            .module_contexts
500            .get(&module_instance)
501            .expect("Unknown module");
502        let transitions = state
503            .transitions(
504                context,
505                &global_context_gen(module_instance, state.operation_id()),
506            )
507            .into_iter()
508            .map(|transition| {
509                let state = state.clone();
510                let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
511                    let StateTransition {
512                        trigger,
513                        transition,
514                    } = transition;
515                    TransitionForActiveState {
516                        outcome: trigger.await,
517                        state,
518                        transition_fn: transition,
519                        meta,
520                    }
521                });
522                f
523            })
524            .collect::<Vec<_>>();
525        if transitions.is_empty() {
526            // In certain cases a terminal (no transitions) state could get here due to
527            // module bug. Inactivate it to prevent accumulation of such states.
528            // See [`Self::add_state_machines_dbtx`].
529            warn!(
530                target: LOG_CLIENT_REACTOR,
531                module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream."
532            );
533            self.db
534                .autocommit::<_, _, anyhow::Error>(
535                    |dbtx, _| {
536                        Box::pin(async {
537                            let k = InactiveStateKey::from_state(state.clone());
538                            let v = ActiveStateMeta::default().into_inactive();
539                            dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
540                                state.clone(),
541                            )))
542                            .await;
543                            dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
544                            Ok(())
545                        })
546                    },
547                    None,
548                )
549                .await
550                .expect("Autocommit here can't fail");
551        }
552
553        transitions
554    }
555
556    async fn run_state_machines_executor_inner(
557        &self,
558        global_context_gen: ContextGen,
559        mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
560    ) -> anyhow::Result<()> {
561        /// All futures in the executor resolve to this type, so the handling
562        /// code can tell them apart.
563        enum ExecutorLoopEvent {
564            /// Notification about `DynState` arrived and should be handled,
565            /// usually added to the list of pending futures.
566            New { state: DynState },
567            /// One of trigger futures of a state machine finished and
568            /// returned transition function to run
569            Triggered(TransitionForActiveState),
570            /// The state machine did not need to run, so it was canceled
571            Invalid { state: DynState },
572            /// Transition function and all the accounting around it are done
573            Completed {
574                state: DynState,
575                outcome: ActiveOrInactiveState,
576            },
577            /// New job receiver disconnected, that can only mean termination
578            Disconnected,
579        }
580
581        let active_states = self.get_active_states().await;
582        debug!(
583            target: LOG_CLIENT_REACTOR,
584            total = active_states.len(),
585            "Starting active state machines",
586        );
587        for (state, meta) in active_states {
588            trace!(target: LOG_CLIENT_REACTOR, ?state, ?meta, "Starting active state");
589
590            let age = fedimint_core::time::now()
591                .duration_since(meta.created_at)
592                .unwrap_or_default();
593            if age > Duration::from_secs(7 * 24 * 3600) {
594                warn!(
595                    target: LOG_CLIENT_REACTOR,
596                    operation_id = %state.operation_id().fmt_short(),
597                    module_instance = %state.module_instance_id(),
598                    age_days = age.as_secs() / 86400,
599                    "Active state machine has been running for over a week, possibly stuck",
600                );
601            }
602            self.sm_update_tx
603                .send(state)
604                .expect("Must be able to send state machine to own opened channel");
605        }
606
607        // Keeps track of things already running, so we can deduplicate, just
608        // in case.
609        let mut currently_running_sms = HashSet::<DynState>::new();
610        // All things happening in parallel go into here
611        // NOTE: `FuturesUnordered` is a footgun: when it's not being polled
612        // (e.g. we picked an event and are awaiting on something to process it),
613        // nothing inside `futures` will be making progress, which in extreme cases
614        // could lead to hangs. For this reason we try really hard in the code here,
615        // to pick an event from `futures` and spawn a new task, avoiding any `await`,
616        // just so we can get back to `futures.next()` ASAP.
617        let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
618            FuturesUnordered::new();
619
620        loop {
621            let event = tokio::select! {
622                new = sm_update_rx.recv() => {
623                    match new { Some(new) => {
624                        ExecutorLoopEvent::New {
625                            state: new,
626                        }
627                    } _ => {
628                        ExecutorLoopEvent::Disconnected
629                    }}
630                },
631
632                event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
633            };
634
635            // main reactor loop: wait for next thing that completed, react (possibly adding
636            // more things to `futures`)
637            match event {
638                ExecutorLoopEvent::New { state } => {
639                    if currently_running_sms.contains(&state) {
640                        warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
641                        continue;
642                    }
643                    currently_running_sms.insert(state.clone());
644                    let futures_len = futures.len();
645                    let global_context_gen = &global_context_gen;
646                    trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
647                    futures.push(Box::pin(async move {
648                        let Some(meta) = self.get_active_state(&state).await else {
649                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
650                            return ExecutorLoopEvent::Invalid { state: state.clone() };
651                        };
652
653                        let transitions = self
654                            .get_transition_for(&state, meta, global_context_gen)
655                            .await;
656                        if transitions.is_empty() {
657                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
658                            return ExecutorLoopEvent::Invalid { state: state.clone() };
659                        }
660                        let transitions_num = transitions.len();
661
662                        debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
663
664                        let (first_completed_result, _index, _unused_transitions) =
665                            select_all(transitions).await;
666                        ExecutorLoopEvent::Triggered(first_completed_result)
667                    }));
668                }
669                ExecutorLoopEvent::Triggered(TransitionForActiveState {
670                    outcome,
671                    state,
672                    meta,
673                    transition_fn,
674                }) => {
675                    debug!(
676                        target: LOG_CLIENT_REACTOR,
677                        operation_id = %state.operation_id().fmt_short(),
678                        "Triggered state transition",
679                    );
680                    let span = tracing::debug_span!(
681                        target: LOG_CLIENT_REACTOR,
682                        "sm_transition",
683                        operation_id = %state.operation_id().fmt_short()
684                    );
685                    // Perform the transition as another future, so transitions can happen in
686                    // parallel.
687                    // Database write conflicts might be happening quite often here,
688                    // but transaction functions are supposed to be idempotent anyway,
689                    // so it seems like a good stress-test in the worst case.
690                    futures.push({
691                        let sm_update_tx = self.sm_update_tx.clone();
692                        let db = self.db.clone();
693                        let notifier = self.notifier.clone();
694                        let module_contexts = self.module_contexts.clone();
695                        let global_context_gen = global_context_gen.clone();
696                        Box::pin(
697                            async move {
698                                debug!(
699                                    target: LOG_CLIENT_REACTOR,
700                                    "Executing state transition",
701                                );
702                                trace!(
703                                    target: LOG_CLIENT_REACTOR,
704                                    ?state,
705                                    outcome = ?AbbreviateJson(&outcome),
706                                    "Executing state transition (details)",
707                                );
708
709                                let module_contexts = &module_contexts;
710                                let global_context_gen = &global_context_gen;
711
712                                let outcome = db
713                                    .autocommit::<'_, '_, _, _, Infallible>(
714                                        |dbtx, _| {
715                                            let state = state.clone();
716                                            let state_module_instance_id = state.module_instance_id();
717                                            let transition_fn = transition_fn.clone();
718                                            let transition_outcome = outcome.clone();
719                                            Box::pin(async move {
720                                                let new_state = transition_fn(
721                                                    &mut ClientSMDatabaseTransaction::new(
722                                                        &mut dbtx.to_ref(),
723                                                        state.module_instance_id(),
724                                                    ),
725                                                    transition_outcome.clone(),
726                                                    state.clone(),
727                                                )
728                                                .await;
729                                                dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
730                                                    state.clone(),
731                                                )))
732                                                .await;
733                                                dbtx.insert_entry(
734                                                    &InactiveStateKeyDb(InactiveStateKey::from_state(state.clone())),
735                                                    &meta.into_inactive(),
736                                                )
737                                                .await;
738
739                                                let context = &module_contexts
740                                                    .get(&state.module_instance_id())
741                                                    .expect("Unknown module");
742
743                                                let operation_id = state.operation_id();
744                                                let global_context = global_context_gen(
745                                                    state.module_instance_id(),
746                                                    operation_id,
747                                                );
748
749                                                let is_terminal = new_state.is_terminal(context, &global_context);
750
751                                                self.log_event_dbtx(dbtx,
752                                                    StateMachineUpdated{
753                                                        started: false,
754                                                        operation_id,
755                                                        module_id: state_module_instance_id,
756                                                        terminal: is_terminal,
757                                                    }
758                                                ).await;
759
760                                                if is_terminal {
761                                                    let k = InactiveStateKey::from_state(
762                                                        new_state.clone(),
763                                                    );
764                                                    let v = ActiveStateMeta::default().into_inactive();
765                                                    dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
766                                                    Ok(ActiveOrInactiveState::Inactive {
767                                                        dyn_state: new_state,
768                                                    })
769                                                } else {
770                                                    let k = ActiveStateKey::from_state(
771                                                        new_state.clone(),
772                                                    );
773                                                    let v = ActiveStateMeta::default();
774                                                    dbtx.insert_entry(&ActiveStateKeyDb(k), &v).await;
775                                                    Ok(ActiveOrInactiveState::Active {
776                                                        dyn_state: new_state,
777                                                        meta: v,
778                                                    })
779                                                }
780                                            })
781                                        },
782                                        None,
783                                    )
784                                    .await
785                                    .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
786
787                                debug!(
788                                    target: LOG_CLIENT_REACTOR,
789                                    terminal = !outcome.is_active(),
790                                    ?outcome,
791                                    "State transition complete",
792                                );
793
794                                match &outcome {
795                                    ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
796                                        sm_update_tx
797                                            .send(dyn_state.clone())
798                                            .expect("can't fail: we are the receiving end");
799                                        notifier.notify(dyn_state.clone());
800                                    }
801                                    ActiveOrInactiveState::Inactive { dyn_state } => {
802                                        notifier.notify(dyn_state.clone());
803                                    }
804                                }
805                                ExecutorLoopEvent::Completed { state, outcome }
806                            }
807                            .instrument(span),
808                        )
809                    });
810                }
811                ExecutorLoopEvent::Invalid { state } => {
812                    trace!(
813                        target: LOG_CLIENT_REACTOR,
814                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
815                        "State invalid"
816                    );
817                    assert!(
818                        currently_running_sms.remove(&state),
819                        "State must have been recorded"
820                    );
821                }
822
823                ExecutorLoopEvent::Completed { state, outcome } => {
824                    assert!(
825                        currently_running_sms.remove(&state),
826                        "State must have been recorded"
827                    );
828                    debug!(
829                        target: LOG_CLIENT_REACTOR,
830                        operation_id = %state.operation_id().fmt_short(),
831                        outcome_active = outcome.is_active(),
832                        total = futures.len(),
833                        "State transition complete"
834                    );
835                    trace!(
836                        target: LOG_CLIENT_REACTOR,
837                        ?outcome,
838                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
839                        "State transition complete"
840                    );
841                }
842                ExecutorLoopEvent::Disconnected => {
843                    break;
844                }
845            }
846        }
847
848        info!(target: LOG_CLIENT_REACTOR, "Terminated.");
849        Ok(())
850    }
851
852    async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
853        self.db
854            .begin_transaction_nc()
855            .await
856            .find_by_prefix(&ActiveStateKeyPrefix)
857            .await
858            // ignore states from modules that are not initialized yet
859            .filter(|(state, _)| {
860                future::ready(
861                    self.module_contexts
862                        .contains_key(&state.0.state.module_instance_id()),
863                )
864            })
865            .map(|(state, meta)| (state.0.state, meta))
866            .collect::<Vec<_>>()
867            .await
868    }
869
870    async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
871        // ignore states from modules that are not initialized yet
872        if !self
873            .module_contexts
874            .contains_key(&state.module_instance_id())
875        {
876            return None;
877        }
878        self.db
879            .begin_transaction_nc()
880            .await
881            .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
882            .await
883    }
884
885    async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
886        self.db
887            .begin_transaction_nc()
888            .await
889            .find_by_prefix(&InactiveStateKeyPrefix)
890            .await
891            // ignore states from modules that are not initialized yet
892            .filter(|(state, _)| {
893                future::ready(
894                    self.module_contexts
895                        .contains_key(&state.0.state.module_instance_id()),
896                )
897            })
898            .map(|(state, meta)| (state.0.state, meta))
899            .collect::<Vec<_>>()
900            .await
901    }
902
903    pub async fn log_event_dbtx<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
904    where
905        E: Event + Send,
906        Cap: Send,
907    {
908        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), None, event)
909            .await;
910    }
911}
912
913impl ExecutorInner {
914    /// See [`Executor::stop_executor`].
915    fn stop_executor(&self) -> Option<()> {
916        let mut state = self.state.write().expect("Locking can't fail");
917
918        state.stop()
919    }
920}
921
922impl Debug for ExecutorInner {
923    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
924        writeln!(f, "ExecutorInner {{}}")
925    }
926}
927
928impl ExecutorBuilder {
929    /// Allow executor being built to run state machines associated with the
930    /// supplied module
931    pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
932    where
933        C: IntoDynInstance<DynType = DynContext>,
934    {
935        self.with_module_dyn(context.into_dyn(instance_id));
936    }
937
938    /// Allow executor being built to run state machines associated with the
939    /// supplied module
940    pub fn with_module_dyn(&mut self, context: DynContext) {
941        self.valid_module_ids.insert(context.module_instance_id());
942
943        if self
944            .module_contexts
945            .insert(context.module_instance_id(), context)
946            .is_some()
947        {
948            panic!("Tried to add two modules with the same instance id!");
949        }
950    }
951
952    /// Allow executor to build state machines associated with the module id,
953    /// for which the module itself might not be available yet (otherwise it
954    /// would be registered with `[Self::with_module_dyn]`).
955    pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
956        self.valid_module_ids.insert(module_id);
957    }
958
959    /// Build [`Executor`] and spawn background task in `tasks` executing active
960    /// state machines. The supplied database `db` must support isolation, so
961    /// cannot be an isolated DB instance itself.
962    pub fn build(
963        self,
964        db: Database,
965        notifier: Notifier,
966        client_task_group: TaskGroup,
967        log_ordering_wakeup_tx: watch::Sender<()>,
968    ) -> Executor {
969        let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
970
971        let inner = Arc::new(ExecutorInner {
972            db,
973            log_ordering_wakeup_tx,
974            state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
975            module_contexts: self.module_contexts,
976            valid_module_ids: self.valid_module_ids,
977            notifier,
978            sm_update_tx,
979            client_task_group,
980        });
981
982        debug!(
983            target: LOG_CLIENT_REACTOR,
984            instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
985            "Initialized state machine executor with module instances"
986        );
987        Executor { inner }
988    }
989}
990#[derive(Debug)]
991pub struct ActiveOperationStateKeyPrefix {
992    pub operation_id: OperationId,
993}
994
995impl Encodable for ActiveOperationStateKeyPrefix {
996    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
997        self.operation_id.consensus_encode(writer)
998    }
999}
1000
1001impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
1002    type Record = ActiveStateKeyDb;
1003}
1004
1005#[derive(Debug)]
1006pub(crate) struct ActiveModuleOperationStateKeyPrefix {
1007    pub operation_id: OperationId,
1008    pub module_instance: ModuleInstanceId,
1009}
1010
1011impl Encodable for ActiveModuleOperationStateKeyPrefix {
1012    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1013        self.operation_id.consensus_encode(writer)?;
1014        self.module_instance.consensus_encode(writer)?;
1015        Ok(())
1016    }
1017}
1018
1019impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
1020    type Record = ActiveStateKeyDb;
1021}
1022
1023#[derive(Debug)]
1024pub struct ActiveStateKeyPrefix;
1025
1026impl Encodable for ActiveStateKeyPrefix {
1027    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1028        Ok(())
1029    }
1030}
1031
1032#[derive(Encodable, Decodable, Debug)]
1033pub struct ActiveStateKeyDb(pub fedimint_client_module::sm::executor::ActiveStateKey);
1034
1035impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyDb {
1036    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1037    const NOTIFY_ON_MODIFY: bool = true;
1038    type Key = Self;
1039    type Value = ActiveStateMeta;
1040}
1041
1042impl DatabaseKeyWithNotify for ActiveStateKeyDb {}
1043
1044impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
1045    type Record = ActiveStateKeyDb;
1046}
1047
1048#[derive(Debug, Encodable, Decodable)]
1049pub struct ActiveStateKeyPrefixBytes;
1050
1051impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
1052    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1053    const NOTIFY_ON_MODIFY: bool = false;
1054    type Key = Self;
1055    type Value = ActiveStateMeta;
1056}
1057
1058impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
1059    type Record = ActiveStateKeyBytes;
1060}
1061
1062#[derive(Encodable, Decodable, Debug)]
1063pub struct InactiveStateKeyDb(pub fedimint_client_module::sm::executor::InactiveStateKey);
1064
1065#[derive(Debug)]
1066pub struct InactiveStateKeyBytes {
1067    pub operation_id: OperationId,
1068    pub module_instance_id: ModuleInstanceId,
1069    pub state: Vec<u8>,
1070}
1071
1072impl Encodable for InactiveStateKeyBytes {
1073    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1074        self.operation_id.consensus_encode(writer)?;
1075        writer.write_all(self.state.as_slice())?;
1076        Ok(())
1077    }
1078}
1079
1080impl Decodable for InactiveStateKeyBytes {
1081    fn consensus_decode_partial<R: std::io::Read>(
1082        reader: &mut R,
1083        modules: &ModuleDecoderRegistry,
1084    ) -> Result<Self, DecodeError> {
1085        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1086        let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1087        let mut bytes = Vec::new();
1088        reader
1089            .read_to_end(&mut bytes)
1090            .map_err(DecodeError::from_err)?;
1091
1092        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1093        instance_bytes.append(&mut bytes);
1094
1095        Ok(InactiveStateKeyBytes {
1096            operation_id,
1097            module_instance_id,
1098            state: instance_bytes,
1099        })
1100    }
1101}
1102
1103#[derive(Debug)]
1104pub struct InactiveOperationStateKeyPrefix {
1105    pub operation_id: OperationId,
1106}
1107
1108impl Encodable for InactiveOperationStateKeyPrefix {
1109    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1110        self.operation_id.consensus_encode(writer)
1111    }
1112}
1113
1114impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1115    type Record = InactiveStateKeyDb;
1116}
1117
1118#[derive(Debug)]
1119pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1120    pub operation_id: OperationId,
1121    pub module_instance: ModuleInstanceId,
1122}
1123
1124impl Encodable for InactiveModuleOperationStateKeyPrefix {
1125    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1126        self.operation_id.consensus_encode(writer)?;
1127        self.module_instance.consensus_encode(writer)?;
1128        Ok(())
1129    }
1130}
1131
1132impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1133    type Record = InactiveStateKeyDb;
1134}
1135
1136#[derive(Debug, Clone)]
1137pub struct InactiveStateKeyPrefix;
1138
1139impl Encodable for InactiveStateKeyPrefix {
1140    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1141        Ok(())
1142    }
1143}
1144
1145#[derive(Debug, Encodable, Decodable)]
1146pub struct InactiveStateKeyPrefixBytes;
1147
1148impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1149    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1150    const NOTIFY_ON_MODIFY: bool = false;
1151    type Key = Self;
1152    type Value = InactiveStateMeta;
1153}
1154
1155impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1156    type Record = InactiveStateKeyBytes;
1157}
1158
1159#[derive(Debug)]
1160pub struct ActiveStateKeyBytes {
1161    pub operation_id: OperationId,
1162    pub module_instance_id: ModuleInstanceId,
1163    pub state: Vec<u8>,
1164}
1165
1166impl Encodable for ActiveStateKeyBytes {
1167    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1168        self.operation_id.consensus_encode(writer)?;
1169        writer.write_all(self.state.as_slice())?;
1170        Ok(())
1171    }
1172}
1173
1174impl Decodable for ActiveStateKeyBytes {
1175    fn consensus_decode_partial<R: std::io::Read>(
1176        reader: &mut R,
1177        modules: &ModuleDecoderRegistry,
1178    ) -> Result<Self, DecodeError> {
1179        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1180        let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1181        let mut bytes = Vec::new();
1182        reader
1183            .read_to_end(&mut bytes)
1184            .map_err(DecodeError::from_err)?;
1185
1186        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1187        instance_bytes.append(&mut bytes);
1188
1189        Ok(ActiveStateKeyBytes {
1190            operation_id,
1191            module_instance_id,
1192            state: instance_bytes,
1193        })
1194    }
1195}
1196impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyDb {
1197    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1198    const NOTIFY_ON_MODIFY: bool = true;
1199    type Key = Self;
1200    type Value = InactiveStateMeta;
1201}
1202
1203impl DatabaseKeyWithNotify for InactiveStateKeyDb {}
1204
1205impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1206    type Record = InactiveStateKeyDb;
1207}
1208
1209#[derive(Debug)]
1210enum ActiveOrInactiveState {
1211    Active {
1212        dyn_state: DynState,
1213        #[allow(dead_code)] // currently not printed anywhere, but useful in the db
1214        meta: ActiveStateMeta,
1215    },
1216    Inactive {
1217        dyn_state: DynState,
1218    },
1219}
1220
1221impl ActiveOrInactiveState {
1222    fn is_active(&self) -> bool {
1223        match self {
1224            ActiveOrInactiveState::Active { .. } => true,
1225            ActiveOrInactiveState::Inactive { .. } => false,
1226        }
1227    }
1228}
1229
1230#[apply(async_trait_maybe_send!)]
1231impl IExecutor for Executor {
1232    async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
1233        Self::get_active_states(self).await
1234    }
1235
1236    async fn add_state_machines_dbtx(
1237        &self,
1238        dbtx: &mut DatabaseTransaction<'_>,
1239        states: Vec<DynState>,
1240    ) -> AddStateMachinesResult {
1241        Self::add_state_machines_dbtx(self, dbtx, states).await
1242    }
1243}
1244
1245#[cfg(test)]
1246mod tests;