fedimint_client/sm/
executor.rs

1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Write};
5use std::mem;
6use std::sync::Arc;
7
8use anyhow::anyhow;
9use fedimint_client_module::sm::executor::{
10    ActiveStateKey, ContextGen, IExecutor, InactiveStateKey,
11};
12use fedimint_client_module::sm::{
13    ActiveStateMeta, ClientSMDatabaseTransaction, DynContext, DynState, InactiveStateMeta, State,
14    StateTransition, StateTransitionFunction,
15};
16use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
17use fedimint_core::db::{
18    AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
19    IDatabaseTransactionOpsCoreTyped,
20};
21use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
22use fedimint_core::fmt_utils::AbbreviateJson;
23use fedimint_core::module::registry::ModuleDecoderRegistry;
24use fedimint_core::task::TaskGroup;
25use fedimint_core::util::{BoxFuture, FmtCompactAnyhow as _};
26use fedimint_core::{apply, async_trait_maybe_send};
27use fedimint_logging::LOG_CLIENT_REACTOR;
28use futures::future::{self, select_all};
29use futures::stream::{FuturesUnordered, StreamExt};
30use tokio::select;
31use tokio::sync::{mpsc, oneshot};
32use tracing::{Instrument, debug, error, info, trace, warn};
33
34use super::notifier::Notifier;
35use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
36
37/// After how many attempts a DB transaction is aborted with an error
38const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
39
40/// Prefixes for executor DB entries
41pub(crate) enum ExecutorDbPrefixes {
42    /// See [`ActiveStateKey`]
43    ActiveStates = 0xa1,
44    /// See [`InactiveStateKey`]
45    InactiveStates = 0xa2,
46}
47
48/// Executor that drives forward state machines under its management.
49///
50/// Each state transition is atomic and supposed to be idempotent such that a
51/// stop/crash of the executor at any point can be recovered from on restart.
52/// The executor is aware of the concept of Fedimint modules and can give state
53/// machines a different [execution context](crate::module::sm::Context)
54/// depending on the owning module, making it very flexible.
55#[derive(Clone, Debug)]
56pub struct Executor {
57    inner: Arc<ExecutorInner>,
58}
59
60struct ExecutorInner {
61    db: Database,
62    state: std::sync::RwLock<ExecutorState>,
63    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
64    valid_module_ids: BTreeSet<ModuleInstanceId>,
65    notifier: Notifier,
66    /// Any time executor should notice state machine update (e.g. because it
67    /// was created), it's must be sent through this channel for it to notice.
68    sm_update_tx: mpsc::UnboundedSender<DynState>,
69    client_task_group: TaskGroup,
70}
71
72enum ExecutorState {
73    Unstarted {
74        sm_update_rx: mpsc::UnboundedReceiver<DynState>,
75    },
76    Running {
77        context_gen: ContextGen,
78        shutdown_sender: oneshot::Sender<()>,
79    },
80    Stopped,
81}
82
83impl ExecutorState {
84    /// Starts the executor, returning a receiver that will be signalled when
85    /// the executor is stopped and a receiver for state machine updates.
86    /// Returns `None` if the executor has already been started and/or stopped.
87    fn start(
88        &mut self,
89        context: ContextGen,
90    ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
91        let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
92
93        let previous_state = mem::replace(
94            self,
95            ExecutorState::Running {
96                context_gen: context,
97                shutdown_sender,
98            },
99        );
100
101        match previous_state {
102            ExecutorState::Unstarted { sm_update_rx } => Some((shutdown_receiver, sm_update_rx)),
103            _ => {
104                // Replace the previous state, undoing the `mem::replace` above.
105                *self = previous_state;
106
107                debug!(target: LOG_CLIENT_REACTOR, "Executor already started, ignoring start request");
108                None
109            }
110        }
111    }
112
113    /// Stops the executor, returning `Some(())` if the executor was running and
114    /// `None` if it was in any other state.
115    fn stop(&mut self) -> Option<()> {
116        let previous_state = mem::replace(self, ExecutorState::Stopped);
117
118        match previous_state {
119            ExecutorState::Running {
120                shutdown_sender, ..
121            } => {
122                if shutdown_sender.send(()).is_err() {
123                    warn!(target: LOG_CLIENT_REACTOR, "Failed to send shutdown signal to executor, already dead?");
124                }
125                Some(())
126            }
127            _ => {
128                // Replace the previous state, undoing the `mem::replace` above.
129                *self = previous_state;
130
131                debug!(target: LOG_CLIENT_REACTOR, "Executor not running, ignoring stop request");
132                None
133            }
134        }
135    }
136
137    fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
138        let ExecutorState::Running { context_gen, .. } = self else {
139            return None;
140        };
141        Some(context_gen(
142            state.module_instance_id(),
143            state.operation_id(),
144        ))
145    }
146}
147
148/// Builder to which module clients can be attached and used to build an
149/// [`Executor`] supporting these.
150#[derive(Debug, Default)]
151pub struct ExecutorBuilder {
152    module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
153    valid_module_ids: BTreeSet<ModuleInstanceId>,
154}
155
156impl Executor {
157    /// Creates an [`ExecutorBuilder`]
158    pub fn builder() -> ExecutorBuilder {
159        ExecutorBuilder::default()
160    }
161
162    pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
163        self.inner.get_active_states().await
164    }
165
166    /// Adds a number of state machines to the executor atomically. They will be
167    /// driven to completion automatically in the background.
168    ///
169    /// **Attention**: do not use before background task is started!
170    // TODO: remove warning once finality is an inherent state attribute
171    pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
172        self.inner
173            .db
174            .autocommit(
175                |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
176                MAX_DB_ATTEMPTS,
177            )
178            .await
179            .map_err(|e| match e {
180                AutocommitError::CommitFailed {
181                    last_error,
182                    attempts,
183                } => last_error.context(format!("Failed to commit after {attempts} attempts")),
184                AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
185            })?;
186
187        // TODO: notify subscribers to state changes?
188
189        Ok(())
190    }
191
192    /// Adds a number of state machines to the executor atomically with other DB
193    /// changes is `dbtx`. See [`Executor::add_state_machines`] for more
194    /// details.
195    ///
196    /// ## Panics
197    /// If called before background task is started using
198    /// [`Executor::start_executor`]!
199    // TODO: remove warning once finality is an inherent state attribute
200    pub async fn add_state_machines_dbtx(
201        &self,
202        dbtx: &mut DatabaseTransaction<'_>,
203        states: Vec<DynState>,
204    ) -> AddStateMachinesResult {
205        for state in states {
206            if !self
207                .inner
208                .valid_module_ids
209                .contains(&state.module_instance_id())
210            {
211                return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
212            }
213
214            let is_active_state = dbtx
215                .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
216                .await
217                .is_some();
218            let is_inactive_state = dbtx
219                .get_value(&InactiveStateKeyDb(InactiveStateKey::from_state(
220                    state.clone(),
221                )))
222                .await
223                .is_some();
224
225            if is_active_state || is_inactive_state {
226                return Err(AddStateMachinesError::StateAlreadyExists);
227            }
228
229            // In case of recovery functions, the module itself is not yet initialized,
230            // so we can't check if the state is terminal. However the
231            // [`Self::get_transitions_for`] function will double check and
232            // deactivate any terminal states that would slip past this check.
233            if let Some(module_context) =
234                self.inner.module_contexts.get(&state.module_instance_id())
235            {
236                match self
237                    .inner
238                    .state
239                    .read()
240                    .expect("locking failed")
241                    .gen_context(&state)
242                {
243                    Some(context) => {
244                        if state.is_terminal(module_context, &context) {
245                            return Err(AddStateMachinesError::Other(anyhow!(
246                                "State is already terminal, adding it to the executor doesn't make sense."
247                            )));
248                        }
249                    }
250                    _ => {
251                        warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
252                    }
253                }
254            }
255
256            dbtx.insert_new_entry(
257                &ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())),
258                &ActiveStateMeta::default(),
259            )
260            .await;
261
262            let notify_sender = self.inner.notifier.sender();
263            let sm_updates_tx = self.inner.sm_update_tx.clone();
264            dbtx.on_commit(move || {
265                notify_sender.notify(state.clone());
266                let _ = sm_updates_tx.send(state);
267            });
268        }
269
270        Ok(())
271    }
272
273    /// **Mostly used for testing**
274    ///
275    /// Check if state exists in the database as part of an actively running
276    /// state machine.
277    pub async fn contains_active_state<S: State>(
278        &self,
279        instance: ModuleInstanceId,
280        state: S,
281    ) -> bool {
282        let state = DynState::from_typed(instance, state);
283        self.inner
284            .get_active_states()
285            .await
286            .into_iter()
287            .any(|(s, _)| s == state)
288    }
289
290    // TODO: unify querying fns
291    /// **Mostly used for testing**
292    ///
293    /// Check if state exists in the database as inactive. If the state is
294    /// terminal it means the corresponding state machine finished its
295    /// execution. If the state is non-terminal it means the state machine was
296    /// in that state at some point but moved on since then.
297    pub async fn contains_inactive_state<S: State>(
298        &self,
299        instance: ModuleInstanceId,
300        state: S,
301    ) -> bool {
302        let state = DynState::from_typed(instance, state);
303        self.inner
304            .get_inactive_states()
305            .await
306            .into_iter()
307            .any(|(s, _)| s == state)
308    }
309
310    pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
311        self.inner
312            .db
313            .wait_key_exists(&InactiveStateKeyDb(InactiveStateKey::from_state(state)))
314            .await
315    }
316
317    pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
318        self.inner
319            .db
320            .wait_key_exists(&ActiveStateKeyDb(ActiveStateKey::from_state(state)))
321            .await
322    }
323
324    /// Only meant for debug tooling
325    pub async fn get_operation_states(
326        &self,
327        operation_id: OperationId,
328    ) -> (
329        Vec<(DynState, ActiveStateMeta)>,
330        Vec<(DynState, InactiveStateMeta)>,
331    ) {
332        let mut dbtx = self.inner.db.begin_transaction_nc().await;
333        let active_states: Vec<_> = dbtx
334            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
335            .await
336            .map(|(active_key, active_meta)| (active_key.0.state, active_meta))
337            .collect()
338            .await;
339        let inactive_states: Vec<_> = dbtx
340            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
341            .await
342            .map(|(active_key, inactive_meta)| (active_key.0.state, inactive_meta))
343            .collect()
344            .await;
345
346        (active_states, inactive_states)
347    }
348
349    /// Starts the background thread that runs the state machines. This cannot
350    /// be done when building the executor since some global contexts in turn
351    /// may depend on the executor, forming a cyclic dependency.
352    ///
353    /// ## Panics
354    /// If called more than once.
355    pub fn start_executor(&self, context_gen: ContextGen) {
356        let Some((shutdown_receiver, sm_update_rx)) = self
357            .inner
358            .state
359            .write()
360            .expect("locking can't fail")
361            .start(context_gen.clone())
362        else {
363            panic!("start_executor was called previously");
364        };
365
366        let task_runner_inner = self.inner.clone();
367        let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
368            let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
369            let task_group_shutdown_rx = task_handle.make_shutdown_rx();
370            select! {
371                () = task_group_shutdown_rx => {
372                    debug!(
373                        target: LOG_CLIENT_REACTOR,
374                        "Shutting down state machine executor runner due to task group shutdown signal"
375                    );
376                },
377                shutdown_happened_sender = shutdown_receiver => {
378                    match shutdown_happened_sender {
379                        Ok(()) => {
380                            debug!(
381                                target: LOG_CLIENT_REACTOR,
382                                "Shutting down state machine executor runner due to explicit shutdown signal"
383                            );
384                        },
385                        Err(_) => {
386                            warn!(
387                                target: LOG_CLIENT_REACTOR,
388                                "Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)"
389                            );
390                        }
391                    }
392                },
393                () = executor_runner => {
394                    error!(target: LOG_CLIENT_REACTOR, "State machine executor runner exited unexpectedly!");
395                },
396            };
397        });
398    }
399
400    /// Stops the background task that runs the state machines.
401    ///
402    /// If a shutdown signal was sent it returns a [`oneshot::Receiver`] that
403    /// will be signalled when the main loop of the background task has
404    /// exited. This can be useful to block until the executor has stopped
405    /// to avoid errors due to the async runtime shutting down while the
406    /// task is still running.
407    ///
408    /// If no shutdown signal was sent it returns `None`. This can happen if
409    /// `stop_executor` is called multiple times.
410    ///
411    /// ## Panics
412    /// If called in parallel with [`start_executor`](Self::start_executor).
413    pub fn stop_executor(&self) -> Option<()> {
414        self.inner.stop_executor()
415    }
416
417    /// Returns a reference to the [`Notifier`] that can be used to subscribe to
418    /// state transitions
419    pub fn notifier(&self) -> &Notifier {
420        &self.inner.notifier
421    }
422}
423
424impl Drop for ExecutorInner {
425    fn drop(&mut self) {
426        self.stop_executor();
427    }
428}
429
430struct TransitionForActiveState {
431    outcome: serde_json::Value,
432    state: DynState,
433    meta: ActiveStateMeta,
434    transition_fn: StateTransitionFunction<DynState>,
435}
436
437impl ExecutorInner {
438    async fn run(
439        &self,
440        global_context_gen: ContextGen,
441        sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
442    ) {
443        debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
444        if let Err(err) = self
445            .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
446            .await
447        {
448            warn!(
449                target: LOG_CLIENT_REACTOR,
450                err = %err.fmt_compact_anyhow(),
451                "An unexpected error occurred during a state transition"
452            );
453        }
454    }
455
456    async fn get_transition_for(
457        &self,
458        state: &DynState,
459        meta: ActiveStateMeta,
460        global_context_gen: &ContextGen,
461    ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
462        let module_instance = state.module_instance_id();
463        let context = &self
464            .module_contexts
465            .get(&module_instance)
466            .expect("Unknown module");
467        let transitions = state
468            .transitions(
469                context,
470                &global_context_gen(module_instance, state.operation_id()),
471            )
472            .into_iter()
473            .map(|transition| {
474                let state = state.clone();
475                let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
476                    let StateTransition {
477                        trigger,
478                        transition,
479                    } = transition;
480                    TransitionForActiveState {
481                        outcome: trigger.await,
482                        state,
483                        transition_fn: transition,
484                        meta,
485                    }
486                });
487                f
488            })
489            .collect::<Vec<_>>();
490        if transitions.is_empty() {
491            // In certain cases a terminal (no transitions) state could get here due to
492            // module bug. Inactivate it to prevent accumulation of such states.
493            // See [`Self::add_state_machines_dbtx`].
494            warn!(
495                target: LOG_CLIENT_REACTOR,
496                module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream."
497            );
498            self.db
499                .autocommit::<_, _, anyhow::Error>(
500                    |dbtx, _| {
501                        Box::pin(async {
502                            let k = InactiveStateKey::from_state(state.clone());
503                            let v = ActiveStateMeta::default().into_inactive();
504                            dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
505                                state.clone(),
506                            )))
507                            .await;
508                            dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
509                            Ok(())
510                        })
511                    },
512                    None,
513                )
514                .await
515                .expect("Autocommit here can't fail");
516        }
517
518        transitions
519    }
520
521    async fn run_state_machines_executor_inner(
522        &self,
523        global_context_gen: ContextGen,
524        mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
525    ) -> anyhow::Result<()> {
526        /// All futures in the executor resolve to this type, so the handling
527        /// code can tell them apart.
528        enum ExecutorLoopEvent {
529            /// Notification about `DynState` arrived and should be handled,
530            /// usually added to the list of pending futures.
531            New { state: DynState },
532            /// One of trigger futures of a state machine finished and
533            /// returned transition function to run
534            Triggered(TransitionForActiveState),
535            /// The state machine did not need to run, so it was canceled
536            Invalid { state: DynState },
537            /// Transition function and all the accounting around it are done
538            Completed {
539                state: DynState,
540                outcome: ActiveOrInactiveState,
541            },
542            /// New job receiver disconnected, that can only mean termination
543            Disconnected,
544        }
545
546        let active_states = self.get_active_states().await;
547        trace!(target: LOG_CLIENT_REACTOR, "Starting active states: {:?}", active_states);
548        for (state, _meta) in active_states {
549            self.sm_update_tx
550                .send(state)
551                .expect("Must be able to send state machine to own opened channel");
552        }
553
554        // Keeps track of things already running, so we can deduplicate, just
555        // in case.
556        let mut currently_running_sms = HashSet::<DynState>::new();
557        // All things happening in parallel go into here
558        // NOTE: `FuturesUnordered` is a footgun: when it's not being polled
559        // (e.g. we picked an event and are awaiting on something to process it),
560        // nothing inside `futures` will be making progress, which in extreme cases
561        // could lead to hangs. For this reason we try really hard in the code here,
562        // to pick an event from `futures` and spawn a new task, avoiding any `await`,
563        // just so we can get back to `futures.next()` ASAP.
564        let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
565            FuturesUnordered::new();
566
567        loop {
568            let event = tokio::select! {
569                new = sm_update_rx.recv() => {
570                    match new { Some(new) => {
571                        ExecutorLoopEvent::New {
572                            state: new,
573                        }
574                    } _ => {
575                        ExecutorLoopEvent::Disconnected
576                    }}
577                },
578
579                event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
580            };
581
582            // main reactor loop: wait for next thing that completed, react (possibly adding
583            // more things to `futures`)
584            match event {
585                ExecutorLoopEvent::New { state } => {
586                    if currently_running_sms.contains(&state) {
587                        warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
588                        continue;
589                    }
590                    currently_running_sms.insert(state.clone());
591                    let futures_len = futures.len();
592                    let global_context_gen = &global_context_gen;
593                    trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
594                    futures.push(Box::pin(async move {
595                        let Some(meta) = self.get_active_state(&state).await else {
596                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
597                            return ExecutorLoopEvent::Invalid { state: state.clone() };
598                        };
599
600                        let transitions = self
601                            .get_transition_for(&state, meta, global_context_gen)
602                            .await;
603                        if transitions.is_empty() {
604                            warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
605                            return ExecutorLoopEvent::Invalid { state: state.clone() };
606                        }
607                        let transitions_num = transitions.len();
608
609                        debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
610
611                        let (first_completed_result, _index, _unused_transitions) =
612                            select_all(transitions).await;
613                        ExecutorLoopEvent::Triggered(first_completed_result)
614                    }));
615                }
616                ExecutorLoopEvent::Triggered(TransitionForActiveState {
617                    outcome,
618                    state,
619                    meta,
620                    transition_fn,
621                }) => {
622                    debug!(
623                        target: LOG_CLIENT_REACTOR,
624                        operation_id = %state.operation_id().fmt_short(),
625                        "Triggered state transition",
626                    );
627                    let span = tracing::debug_span!(
628                        target: LOG_CLIENT_REACTOR,
629                        "sm_transition",
630                        operation_id = %state.operation_id().fmt_short()
631                    );
632                    // Perform the transition as another future, so transitions can happen in
633                    // parallel.
634                    // Database write conflicts might be happening quite often here,
635                    // but transaction functions are supposed to be idempotent anyway,
636                    // so it seems like a good stress-test in the worst case.
637                    futures.push({
638                        let sm_update_tx = self.sm_update_tx.clone();
639                        let db = self.db.clone();
640                        let notifier = self.notifier.clone();
641                        let module_contexts = self.module_contexts.clone();
642                        let global_context_gen = global_context_gen.clone();
643                        Box::pin(
644                            async move {
645                                debug!(
646                                    target: LOG_CLIENT_REACTOR,
647                                    "Executing state transition",
648                                );
649                                trace!(
650                                    target: LOG_CLIENT_REACTOR,
651                                    ?state,
652                                    outcome = ?AbbreviateJson(&outcome),
653                                    "Executing state transition (details)",
654                                );
655
656                                let module_contexts = &module_contexts;
657                                let global_context_gen = &global_context_gen;
658
659                                let outcome = db
660                                    .autocommit::<'_, '_, _, _, Infallible>(
661                                        |dbtx, _| {
662                                            let state = state.clone();
663                                            let transition_fn = transition_fn.clone();
664                                            let transition_outcome = outcome.clone();
665                                            Box::pin(async move {
666                                                let new_state = transition_fn(
667                                                    &mut ClientSMDatabaseTransaction::new(
668                                                        &mut dbtx.to_ref(),
669                                                        state.module_instance_id(),
670                                                    ),
671                                                    transition_outcome.clone(),
672                                                    state.clone(),
673                                                )
674                                                .await;
675                                                dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
676                                                    state.clone(),
677                                                )))
678                                                .await;
679                                                dbtx.insert_entry(
680                                                    &InactiveStateKeyDb(InactiveStateKey::from_state(state.clone())),
681                                                    &meta.into_inactive(),
682                                                )
683                                                .await;
684
685                                                let context = &module_contexts
686                                                    .get(&state.module_instance_id())
687                                                    .expect("Unknown module");
688
689                                                let operation_id = state.operation_id();
690                                                let global_context = global_context_gen(
691                                                    state.module_instance_id(),
692                                                    operation_id,
693                                                );
694
695                                                let is_terminal = new_state.is_terminal(context, &global_context);
696
697                                                if is_terminal {
698                                                    let k = InactiveStateKey::from_state(
699                                                        new_state.clone(),
700                                                    );
701                                                    let v = ActiveStateMeta::default().into_inactive();
702                                                    dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
703                                                    Ok(ActiveOrInactiveState::Inactive {
704                                                        dyn_state: new_state,
705                                                    })
706                                                } else {
707                                                    let k = ActiveStateKey::from_state(
708                                                        new_state.clone(),
709                                                    );
710                                                    let v = ActiveStateMeta::default();
711                                                    dbtx.insert_entry(&ActiveStateKeyDb(k), &v).await;
712                                                    Ok(ActiveOrInactiveState::Active {
713                                                        dyn_state: new_state,
714                                                        meta: v,
715                                                    })
716                                                }
717                                            })
718                                        },
719                                        None,
720                                    )
721                                    .await
722                                    .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
723
724                                debug!(
725                                    target: LOG_CLIENT_REACTOR,
726                                    terminal = !outcome.is_active(),
727                                    ?outcome,
728                                    "State transition complete",
729                                );
730
731                                match &outcome {
732                                    ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
733                                        sm_update_tx
734                                            .send(dyn_state.clone())
735                                            .expect("can't fail: we are the receiving end");
736                                        notifier.notify(dyn_state.clone());
737                                    }
738                                    ActiveOrInactiveState::Inactive { dyn_state } => {
739                                        notifier.notify(dyn_state.clone());
740                                    }
741                                }
742                                ExecutorLoopEvent::Completed { state, outcome }
743                            }
744                            .instrument(span),
745                        )
746                    });
747                }
748                ExecutorLoopEvent::Invalid { state } => {
749                    trace!(
750                        target: LOG_CLIENT_REACTOR,
751                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
752                        "State invalid"
753                    );
754                    assert!(
755                        currently_running_sms.remove(&state),
756                        "State must have been recorded"
757                    );
758                }
759
760                ExecutorLoopEvent::Completed { state, outcome } => {
761                    assert!(
762                        currently_running_sms.remove(&state),
763                        "State must have been recorded"
764                    );
765                    debug!(
766                        target: LOG_CLIENT_REACTOR,
767                        operation_id = %state.operation_id().fmt_short(),
768                        outcome_active = outcome.is_active(),
769                        total = futures.len(),
770                        "State transition complete"
771                    );
772                    trace!(
773                        target: LOG_CLIENT_REACTOR,
774                        ?outcome,
775                        operation_id = %state.operation_id().fmt_short(), total = futures.len(),
776                        "State transition complete"
777                    );
778                }
779                ExecutorLoopEvent::Disconnected => {
780                    break;
781                }
782            }
783        }
784
785        info!(target: LOG_CLIENT_REACTOR, "Terminated.");
786        Ok(())
787    }
788
789    async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
790        self.db
791            .begin_transaction_nc()
792            .await
793            .find_by_prefix(&ActiveStateKeyPrefix)
794            .await
795            // ignore states from modules that are not initialized yet
796            .filter(|(state, _)| {
797                future::ready(
798                    self.module_contexts
799                        .contains_key(&state.0.state.module_instance_id()),
800                )
801            })
802            .map(|(state, meta)| (state.0.state, meta))
803            .collect::<Vec<_>>()
804            .await
805    }
806
807    async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
808        // ignore states from modules that are not initialized yet
809        if !self
810            .module_contexts
811            .contains_key(&state.module_instance_id())
812        {
813            return None;
814        }
815        self.db
816            .begin_transaction_nc()
817            .await
818            .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
819            .await
820    }
821
822    async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
823        self.db
824            .begin_transaction_nc()
825            .await
826            .find_by_prefix(&InactiveStateKeyPrefix)
827            .await
828            // ignore states from modules that are not initialized yet
829            .filter(|(state, _)| {
830                future::ready(
831                    self.module_contexts
832                        .contains_key(&state.0.state.module_instance_id()),
833                )
834            })
835            .map(|(state, meta)| (state.0.state, meta))
836            .collect::<Vec<_>>()
837            .await
838    }
839}
840
841impl ExecutorInner {
842    /// See [`Executor::stop_executor`].
843    fn stop_executor(&self) -> Option<()> {
844        let mut state = self.state.write().expect("Locking can't fail");
845
846        state.stop()
847    }
848}
849
850impl Debug for ExecutorInner {
851    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
852        writeln!(f, "ExecutorInner {{}}")
853    }
854}
855
856impl ExecutorBuilder {
857    /// Allow executor being built to run state machines associated with the
858    /// supplied module
859    pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
860    where
861        C: IntoDynInstance<DynType = DynContext>,
862    {
863        self.with_module_dyn(context.into_dyn(instance_id));
864    }
865
866    /// Allow executor being built to run state machines associated with the
867    /// supplied module
868    pub fn with_module_dyn(&mut self, context: DynContext) {
869        self.valid_module_ids.insert(context.module_instance_id());
870
871        if self
872            .module_contexts
873            .insert(context.module_instance_id(), context)
874            .is_some()
875        {
876            panic!("Tried to add two modules with the same instance id!");
877        }
878    }
879
880    /// Allow executor to build state machines associated with the module id,
881    /// for which the module itself might not be available yet (otherwise it
882    /// would be registered with `[Self::with_module_dyn]`).
883    pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
884        self.valid_module_ids.insert(module_id);
885    }
886
887    /// Build [`Executor`] and spawn background task in `tasks` executing active
888    /// state machines. The supplied database `db` must support isolation, so
889    /// cannot be an isolated DB instance itself.
890    pub fn build(self, db: Database, notifier: Notifier, client_task_group: TaskGroup) -> Executor {
891        let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
892
893        let inner = Arc::new(ExecutorInner {
894            db,
895            state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
896            module_contexts: self.module_contexts,
897            valid_module_ids: self.valid_module_ids,
898            notifier,
899            sm_update_tx,
900            client_task_group,
901        });
902
903        debug!(
904            target: LOG_CLIENT_REACTOR,
905            instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
906            "Initialized state machine executor with module instances"
907        );
908        Executor { inner }
909    }
910}
911#[derive(Debug)]
912pub struct ActiveOperationStateKeyPrefix {
913    pub operation_id: OperationId,
914}
915
916impl Encodable for ActiveOperationStateKeyPrefix {
917    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
918        self.operation_id.consensus_encode(writer)
919    }
920}
921
922impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
923    type Record = ActiveStateKeyDb;
924}
925
926#[derive(Debug)]
927pub(crate) struct ActiveModuleOperationStateKeyPrefix {
928    pub operation_id: OperationId,
929    pub module_instance: ModuleInstanceId,
930}
931
932impl Encodable for ActiveModuleOperationStateKeyPrefix {
933    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
934        self.operation_id.consensus_encode(writer)?;
935        self.module_instance.consensus_encode(writer)?;
936        Ok(())
937    }
938}
939
940impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
941    type Record = ActiveStateKeyDb;
942}
943
944#[derive(Debug)]
945pub struct ActiveStateKeyPrefix;
946
947impl Encodable for ActiveStateKeyPrefix {
948    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
949        Ok(())
950    }
951}
952
953#[derive(Encodable, Decodable, Debug)]
954pub struct ActiveStateKeyDb(pub fedimint_client_module::sm::executor::ActiveStateKey);
955
956impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyDb {
957    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
958    const NOTIFY_ON_MODIFY: bool = true;
959    type Key = Self;
960    type Value = ActiveStateMeta;
961}
962
963impl DatabaseKeyWithNotify for ActiveStateKeyDb {}
964
965impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
966    type Record = ActiveStateKeyDb;
967}
968
969#[derive(Debug, Encodable, Decodable)]
970pub struct ActiveStateKeyPrefixBytes;
971
972impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
973    const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
974    const NOTIFY_ON_MODIFY: bool = false;
975    type Key = Self;
976    type Value = ActiveStateMeta;
977}
978
979impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
980    type Record = ActiveStateKeyBytes;
981}
982
983#[derive(Encodable, Decodable, Debug)]
984pub struct InactiveStateKeyDb(pub fedimint_client_module::sm::executor::InactiveStateKey);
985
986#[derive(Debug)]
987pub struct InactiveStateKeyBytes {
988    pub operation_id: OperationId,
989    pub module_instance_id: ModuleInstanceId,
990    pub state: Vec<u8>,
991}
992
993impl Encodable for InactiveStateKeyBytes {
994    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
995        self.operation_id.consensus_encode(writer)?;
996        writer.write_all(self.state.as_slice())?;
997        Ok(())
998    }
999}
1000
1001impl Decodable for InactiveStateKeyBytes {
1002    fn consensus_decode_partial<R: std::io::Read>(
1003        reader: &mut R,
1004        modules: &ModuleDecoderRegistry,
1005    ) -> Result<Self, DecodeError> {
1006        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1007        let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1008        let mut bytes = Vec::new();
1009        reader
1010            .read_to_end(&mut bytes)
1011            .map_err(DecodeError::from_err)?;
1012
1013        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1014        instance_bytes.append(&mut bytes);
1015
1016        Ok(InactiveStateKeyBytes {
1017            operation_id,
1018            module_instance_id,
1019            state: instance_bytes,
1020        })
1021    }
1022}
1023
1024#[derive(Debug)]
1025pub struct InactiveOperationStateKeyPrefix {
1026    pub operation_id: OperationId,
1027}
1028
1029impl Encodable for InactiveOperationStateKeyPrefix {
1030    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1031        self.operation_id.consensus_encode(writer)
1032    }
1033}
1034
1035impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1036    type Record = InactiveStateKeyDb;
1037}
1038
1039#[derive(Debug)]
1040pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1041    pub operation_id: OperationId,
1042    pub module_instance: ModuleInstanceId,
1043}
1044
1045impl Encodable for InactiveModuleOperationStateKeyPrefix {
1046    fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1047        self.operation_id.consensus_encode(writer)?;
1048        self.module_instance.consensus_encode(writer)?;
1049        Ok(())
1050    }
1051}
1052
1053impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1054    type Record = InactiveStateKeyDb;
1055}
1056
1057#[derive(Debug, Clone)]
1058pub struct InactiveStateKeyPrefix;
1059
1060impl Encodable for InactiveStateKeyPrefix {
1061    fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1062        Ok(())
1063    }
1064}
1065
1066#[derive(Debug, Encodable, Decodable)]
1067pub struct InactiveStateKeyPrefixBytes;
1068
1069impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1070    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1071    const NOTIFY_ON_MODIFY: bool = false;
1072    type Key = Self;
1073    type Value = InactiveStateMeta;
1074}
1075
1076impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1077    type Record = InactiveStateKeyBytes;
1078}
1079
1080#[derive(Debug)]
1081pub struct ActiveStateKeyBytes {
1082    pub operation_id: OperationId,
1083    pub module_instance_id: ModuleInstanceId,
1084    pub state: Vec<u8>,
1085}
1086
1087impl Encodable for ActiveStateKeyBytes {
1088    fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1089        self.operation_id.consensus_encode(writer)?;
1090        writer.write_all(self.state.as_slice())?;
1091        Ok(())
1092    }
1093}
1094
1095impl Decodable for ActiveStateKeyBytes {
1096    fn consensus_decode_partial<R: std::io::Read>(
1097        reader: &mut R,
1098        modules: &ModuleDecoderRegistry,
1099    ) -> Result<Self, DecodeError> {
1100        let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1101        let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1102        let mut bytes = Vec::new();
1103        reader
1104            .read_to_end(&mut bytes)
1105            .map_err(DecodeError::from_err)?;
1106
1107        let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1108        instance_bytes.append(&mut bytes);
1109
1110        Ok(ActiveStateKeyBytes {
1111            operation_id,
1112            module_instance_id,
1113            state: instance_bytes,
1114        })
1115    }
1116}
1117impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyDb {
1118    const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1119    const NOTIFY_ON_MODIFY: bool = true;
1120    type Key = Self;
1121    type Value = InactiveStateMeta;
1122}
1123
1124impl DatabaseKeyWithNotify for InactiveStateKeyDb {}
1125
1126impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1127    type Record = InactiveStateKeyDb;
1128}
1129
1130#[derive(Debug)]
1131enum ActiveOrInactiveState {
1132    Active {
1133        dyn_state: DynState,
1134        #[allow(dead_code)] // currently not printed anywhere, but useful in the db
1135        meta: ActiveStateMeta,
1136    },
1137    Inactive {
1138        dyn_state: DynState,
1139    },
1140}
1141
1142impl ActiveOrInactiveState {
1143    fn is_active(&self) -> bool {
1144        match self {
1145            ActiveOrInactiveState::Active { .. } => true,
1146            ActiveOrInactiveState::Inactive { .. } => false,
1147        }
1148    }
1149}
1150
1151#[apply(async_trait_maybe_send!)]
1152impl IExecutor for Executor {
1153    async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
1154        Self::get_active_states(self).await
1155    }
1156
1157    async fn add_state_machines_dbtx(
1158        &self,
1159        dbtx: &mut DatabaseTransaction<'_>,
1160        states: Vec<DynState>,
1161    ) -> AddStateMachinesResult {
1162        Self::add_state_machines_dbtx(self, dbtx, states).await
1163    }
1164}
1165
1166#[cfg(test)]
1167mod tests;