1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Write};
5use std::mem;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::anyhow;
10use fedimint_client_module::sm::executor::{
11 ActiveStateKey, ContextGen, IExecutor, InactiveStateKey,
12};
13use fedimint_client_module::sm::{
14 ActiveStateMeta, ClientSMDatabaseTransaction, DynContext, DynState, InactiveStateMeta, State,
15 StateTransition, StateTransitionFunction,
16};
17use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
18use fedimint_core::db::{
19 AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
20 IDatabaseTransactionOpsCoreTyped,
21};
22use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
23use fedimint_core::fmt_utils::AbbreviateJson;
24use fedimint_core::module::registry::ModuleDecoderRegistry;
25use fedimint_core::task::TaskGroup;
26use fedimint_core::util::{BoxFuture, FmtCompactAnyhow as _};
27use fedimint_core::{apply, async_trait_maybe_send};
28use fedimint_eventlog::{DBTransactionEventLogExt as _, Event, EventKind, EventPersistence};
29use fedimint_logging::LOG_CLIENT_REACTOR;
30use futures::future::{self, select_all};
31use futures::stream::{FuturesUnordered, StreamExt};
32use serde::{Deserialize, Serialize};
33use tokio::select;
34use tokio::sync::{mpsc, oneshot, watch};
35use tracing::{Instrument, debug, error, info, trace, warn};
36
37use crate::sm::notifier::Notifier;
38use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
39
40const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
42
43pub(crate) enum ExecutorDbPrefixes {
45 ActiveStates = 0xa1,
47 InactiveStates = 0xa2,
49}
50
51#[derive(Serialize, Deserialize)]
52pub struct StateMachineUpdated {
53 operation_id: OperationId,
54 started: bool,
55 terminal: bool,
56 module_id: ModuleInstanceId,
57}
58
59impl Event for StateMachineUpdated {
60 const MODULE: Option<fedimint_core::core::ModuleKind> = None;
61 const KIND: EventKind = EventKind::from_static("sm-updated");
62 const PERSISTENCE: EventPersistence = EventPersistence::Trimable;
63}
64
65#[derive(Clone, Debug)]
73pub struct Executor {
74 inner: Arc<ExecutorInner>,
75}
76
77struct ExecutorInner {
78 db: Database,
79 state: std::sync::RwLock<ExecutorState>,
80 module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
81 valid_module_ids: BTreeSet<ModuleInstanceId>,
82 notifier: Notifier,
83 sm_update_tx: mpsc::UnboundedSender<DynState>,
86 client_task_group: TaskGroup,
87 log_ordering_wakeup_tx: watch::Sender<()>,
88}
89
90enum ExecutorState {
91 Unstarted {
92 sm_update_rx: mpsc::UnboundedReceiver<DynState>,
93 },
94 Running {
95 context_gen: ContextGen,
96 shutdown_sender: oneshot::Sender<()>,
97 },
98 Stopped,
99}
100
101impl ExecutorState {
102 fn start(
106 &mut self,
107 context: ContextGen,
108 ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
109 let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
110
111 let previous_state = mem::replace(
112 self,
113 ExecutorState::Running {
114 context_gen: context,
115 shutdown_sender,
116 },
117 );
118
119 match previous_state {
120 ExecutorState::Unstarted { sm_update_rx } => Some((shutdown_receiver, sm_update_rx)),
121 _ => {
122 *self = previous_state;
124
125 debug!(target: LOG_CLIENT_REACTOR, "Executor already started, ignoring start request");
126 None
127 }
128 }
129 }
130
131 fn stop(&mut self) -> Option<()> {
134 let previous_state = mem::replace(self, ExecutorState::Stopped);
135
136 match previous_state {
137 ExecutorState::Running {
138 shutdown_sender, ..
139 } => {
140 if shutdown_sender.send(()).is_err() {
141 warn!(target: LOG_CLIENT_REACTOR, "Failed to send shutdown signal to executor, already dead?");
142 }
143 Some(())
144 }
145 _ => {
146 *self = previous_state;
148
149 debug!(target: LOG_CLIENT_REACTOR, "Executor not running, ignoring stop request");
150 None
151 }
152 }
153 }
154
155 fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
156 let ExecutorState::Running { context_gen, .. } = self else {
157 return None;
158 };
159 Some(context_gen(
160 state.module_instance_id(),
161 state.operation_id(),
162 ))
163 }
164}
165
166#[derive(Debug, Default)]
169pub struct ExecutorBuilder {
170 module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
171 valid_module_ids: BTreeSet<ModuleInstanceId>,
172}
173
174impl Executor {
175 pub fn builder() -> ExecutorBuilder {
177 ExecutorBuilder::default()
178 }
179
180 pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
181 self.inner.get_active_states().await
182 }
183
184 pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
190 self.inner
191 .db
192 .autocommit(
193 |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
194 MAX_DB_ATTEMPTS,
195 )
196 .await
197 .map_err(|e| match e {
198 AutocommitError::CommitFailed {
199 last_error,
200 attempts,
201 } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
202 AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
203 })?;
204
205 Ok(())
208 }
209
210 pub async fn add_state_machines_dbtx(
219 &self,
220 dbtx: &mut DatabaseTransaction<'_>,
221 states: Vec<DynState>,
222 ) -> AddStateMachinesResult {
223 for state in states {
224 if !self
225 .inner
226 .valid_module_ids
227 .contains(&state.module_instance_id())
228 {
229 return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
230 }
231
232 let is_active_state = dbtx
233 .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
234 .await
235 .is_some();
236 let is_inactive_state = dbtx
237 .get_value(&InactiveStateKeyDb(InactiveStateKey::from_state(
238 state.clone(),
239 )))
240 .await
241 .is_some();
242
243 if is_active_state || is_inactive_state {
244 return Err(AddStateMachinesError::StateAlreadyExists);
245 }
246
247 if let Some(module_context) =
252 self.inner.module_contexts.get(&state.module_instance_id())
253 {
254 match self
255 .inner
256 .state
257 .read()
258 .expect("locking failed")
259 .gen_context(&state)
260 {
261 Some(context) => {
262 if state.is_terminal(module_context, &context) {
263 return Err(AddStateMachinesError::Other(anyhow!(
264 "State is already terminal, adding it to the executor doesn't make sense."
265 )));
266 }
267 }
268 _ => {
269 warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
270 }
271 }
272 }
273
274 dbtx.insert_new_entry(
275 &ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())),
276 &ActiveStateMeta::default(),
277 )
278 .await;
279
280 let operation_id = state.operation_id();
281 self.inner
282 .log_event_dbtx(
283 dbtx,
284 StateMachineUpdated {
285 operation_id,
286 started: true,
287 terminal: false,
288 module_id: state.module_instance_id(),
289 },
290 )
291 .await;
292
293 let notify_sender = self.inner.notifier.sender();
294 let sm_updates_tx = self.inner.sm_update_tx.clone();
295 dbtx.on_commit(move || {
296 notify_sender.notify(state.clone());
297 let _ = sm_updates_tx.send(state);
298 });
299 }
300
301 Ok(())
302 }
303
304 pub async fn contains_active_state<S: State>(
309 &self,
310 instance: ModuleInstanceId,
311 state: S,
312 ) -> bool {
313 let state = DynState::from_typed(instance, state);
314 self.inner
315 .get_active_states()
316 .await
317 .into_iter()
318 .any(|(s, _)| s == state)
319 }
320
321 pub async fn contains_inactive_state<S: State>(
329 &self,
330 instance: ModuleInstanceId,
331 state: S,
332 ) -> bool {
333 let state = DynState::from_typed(instance, state);
334 self.inner
335 .get_inactive_states()
336 .await
337 .into_iter()
338 .any(|(s, _)| s == state)
339 }
340
341 pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
342 self.inner
343 .db
344 .wait_key_exists(&InactiveStateKeyDb(InactiveStateKey::from_state(state)))
345 .await
346 }
347
348 pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
349 self.inner
350 .db
351 .wait_key_exists(&ActiveStateKeyDb(ActiveStateKey::from_state(state)))
352 .await
353 }
354
355 pub async fn get_operation_states(
357 &self,
358 operation_id: OperationId,
359 ) -> (
360 Vec<(DynState, ActiveStateMeta)>,
361 Vec<(DynState, InactiveStateMeta)>,
362 ) {
363 let mut dbtx = self.inner.db.begin_transaction_nc().await;
364 let active_states: Vec<_> = dbtx
365 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
366 .await
367 .map(|(active_key, active_meta)| (active_key.0.state, active_meta))
368 .collect()
369 .await;
370 let inactive_states: Vec<_> = dbtx
371 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
372 .await
373 .map(|(active_key, inactive_meta)| (active_key.0.state, inactive_meta))
374 .collect()
375 .await;
376
377 (active_states, inactive_states)
378 }
379
380 pub fn start_executor(&self, context_gen: ContextGen) {
387 let Some((shutdown_receiver, sm_update_rx)) = self
388 .inner
389 .state
390 .write()
391 .expect("locking can't fail")
392 .start(context_gen.clone())
393 else {
394 panic!("start_executor was called previously");
395 };
396
397 let task_runner_inner = self.inner.clone();
398 let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
399 let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
400 let task_group_shutdown_rx = task_handle.make_shutdown_rx();
401 select! {
402 () = task_group_shutdown_rx => {
403 debug!(
404 target: LOG_CLIENT_REACTOR,
405 "Shutting down state machine executor runner due to task group shutdown signal"
406 );
407 },
408 shutdown_happened_sender = shutdown_receiver => {
409 match shutdown_happened_sender {
410 Ok(()) => {
411 debug!(
412 target: LOG_CLIENT_REACTOR,
413 "Shutting down state machine executor runner due to explicit shutdown signal"
414 );
415 },
416 Err(_) => {
417 warn!(
418 target: LOG_CLIENT_REACTOR,
419 "Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)"
420 );
421 }
422 }
423 },
424 () = executor_runner => {
425 error!(target: LOG_CLIENT_REACTOR, "State machine executor runner exited unexpectedly!");
426 },
427 };
428 });
429 }
430
431 pub fn stop_executor(&self) -> Option<()> {
445 self.inner.stop_executor()
446 }
447
448 pub fn notifier(&self) -> &Notifier {
451 &self.inner.notifier
452 }
453}
454
455impl Drop for ExecutorInner {
456 fn drop(&mut self) {
457 self.stop_executor();
458 }
459}
460
461struct TransitionForActiveState {
462 outcome: serde_json::Value,
463 state: DynState,
464 meta: ActiveStateMeta,
465 transition_fn: StateTransitionFunction<DynState>,
466}
467
468impl ExecutorInner {
469 async fn run(
470 &self,
471 global_context_gen: ContextGen,
472 sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
473 ) {
474 debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
475 if let Err(err) = self
476 .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
477 .await
478 {
479 warn!(
480 target: LOG_CLIENT_REACTOR,
481 err = %err.fmt_compact_anyhow(),
482 "An unexpected error occurred during a state transition"
483 );
484 }
485 }
486
487 async fn get_transition_for(
488 &self,
489 state: &DynState,
490 meta: ActiveStateMeta,
491 global_context_gen: &ContextGen,
492 ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
493 let module_instance = state.module_instance_id();
494 let context = &self
495 .module_contexts
496 .get(&module_instance)
497 .expect("Unknown module");
498 let transitions = state
499 .transitions(
500 context,
501 &global_context_gen(module_instance, state.operation_id()),
502 )
503 .into_iter()
504 .map(|transition| {
505 let state = state.clone();
506 let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
507 let StateTransition {
508 trigger,
509 transition,
510 } = transition;
511 TransitionForActiveState {
512 outcome: trigger.await,
513 state,
514 transition_fn: transition,
515 meta,
516 }
517 });
518 f
519 })
520 .collect::<Vec<_>>();
521 if transitions.is_empty() {
522 warn!(
526 target: LOG_CLIENT_REACTOR,
527 module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream."
528 );
529 self.db
530 .autocommit::<_, _, anyhow::Error>(
531 |dbtx, _| {
532 Box::pin(async {
533 let k = InactiveStateKey::from_state(state.clone());
534 let v = ActiveStateMeta::default().into_inactive();
535 dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
536 state.clone(),
537 )))
538 .await;
539 dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
540 Ok(())
541 })
542 },
543 None,
544 )
545 .await
546 .expect("Autocommit here can't fail");
547 }
548
549 transitions
550 }
551
552 async fn run_state_machines_executor_inner(
553 &self,
554 global_context_gen: ContextGen,
555 mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
556 ) -> anyhow::Result<()> {
557 enum ExecutorLoopEvent {
560 New { state: DynState },
563 Triggered(TransitionForActiveState),
566 Invalid { state: DynState },
568 Completed {
570 state: DynState,
571 outcome: ActiveOrInactiveState,
572 },
573 Disconnected,
575 }
576
577 let active_states = self.get_active_states().await;
578 trace!(target: LOG_CLIENT_REACTOR, "Starting active states: {:?}", active_states);
579 debug!(
580 target: LOG_CLIENT_REACTOR,
581 total = active_states.len(),
582 "Starting active state machines",
583 );
584 for (state, meta) in active_states {
585 let age = fedimint_core::time::now()
586 .duration_since(meta.created_at)
587 .unwrap_or_default();
588 if age > Duration::from_secs(7 * 24 * 3600) {
589 warn!(
590 target: LOG_CLIENT_REACTOR,
591 operation_id = %state.operation_id().fmt_short(),
592 module_instance = %state.module_instance_id(),
593 age_days = age.as_secs() / 86400,
594 "Active state machine has been running for over a week, possibly stuck",
595 );
596 }
597 self.sm_update_tx
598 .send(state)
599 .expect("Must be able to send state machine to own opened channel");
600 }
601
602 let mut currently_running_sms = HashSet::<DynState>::new();
605 let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
613 FuturesUnordered::new();
614
615 loop {
616 let event = tokio::select! {
617 new = sm_update_rx.recv() => {
618 match new { Some(new) => {
619 ExecutorLoopEvent::New {
620 state: new,
621 }
622 } _ => {
623 ExecutorLoopEvent::Disconnected
624 }}
625 },
626
627 event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
628 };
629
630 match event {
633 ExecutorLoopEvent::New { state } => {
634 if currently_running_sms.contains(&state) {
635 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
636 continue;
637 }
638 currently_running_sms.insert(state.clone());
639 let futures_len = futures.len();
640 let global_context_gen = &global_context_gen;
641 trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
642 futures.push(Box::pin(async move {
643 let Some(meta) = self.get_active_state(&state).await else {
644 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
645 return ExecutorLoopEvent::Invalid { state: state.clone() };
646 };
647
648 let transitions = self
649 .get_transition_for(&state, meta, global_context_gen)
650 .await;
651 if transitions.is_empty() {
652 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
653 return ExecutorLoopEvent::Invalid { state: state.clone() };
654 }
655 let transitions_num = transitions.len();
656
657 debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
658
659 let (first_completed_result, _index, _unused_transitions) =
660 select_all(transitions).await;
661 ExecutorLoopEvent::Triggered(first_completed_result)
662 }));
663 }
664 ExecutorLoopEvent::Triggered(TransitionForActiveState {
665 outcome,
666 state,
667 meta,
668 transition_fn,
669 }) => {
670 debug!(
671 target: LOG_CLIENT_REACTOR,
672 operation_id = %state.operation_id().fmt_short(),
673 "Triggered state transition",
674 );
675 let span = tracing::debug_span!(
676 target: LOG_CLIENT_REACTOR,
677 "sm_transition",
678 operation_id = %state.operation_id().fmt_short()
679 );
680 futures.push({
686 let sm_update_tx = self.sm_update_tx.clone();
687 let db = self.db.clone();
688 let notifier = self.notifier.clone();
689 let module_contexts = self.module_contexts.clone();
690 let global_context_gen = global_context_gen.clone();
691 Box::pin(
692 async move {
693 debug!(
694 target: LOG_CLIENT_REACTOR,
695 "Executing state transition",
696 );
697 trace!(
698 target: LOG_CLIENT_REACTOR,
699 ?state,
700 outcome = ?AbbreviateJson(&outcome),
701 "Executing state transition (details)",
702 );
703
704 let module_contexts = &module_contexts;
705 let global_context_gen = &global_context_gen;
706
707 let outcome = db
708 .autocommit::<'_, '_, _, _, Infallible>(
709 |dbtx, _| {
710 let state = state.clone();
711 let state_module_instance_id = state.module_instance_id();
712 let transition_fn = transition_fn.clone();
713 let transition_outcome = outcome.clone();
714 Box::pin(async move {
715 let new_state = transition_fn(
716 &mut ClientSMDatabaseTransaction::new(
717 &mut dbtx.to_ref(),
718 state.module_instance_id(),
719 ),
720 transition_outcome.clone(),
721 state.clone(),
722 )
723 .await;
724 dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
725 state.clone(),
726 )))
727 .await;
728 dbtx.insert_entry(
729 &InactiveStateKeyDb(InactiveStateKey::from_state(state.clone())),
730 &meta.into_inactive(),
731 )
732 .await;
733
734 let context = &module_contexts
735 .get(&state.module_instance_id())
736 .expect("Unknown module");
737
738 let operation_id = state.operation_id();
739 let global_context = global_context_gen(
740 state.module_instance_id(),
741 operation_id,
742 );
743
744 let is_terminal = new_state.is_terminal(context, &global_context);
745
746 self.log_event_dbtx(dbtx,
747 StateMachineUpdated{
748 started: false,
749 operation_id,
750 module_id: state_module_instance_id,
751 terminal: is_terminal,
752 }
753 ).await;
754
755 if is_terminal {
756 let k = InactiveStateKey::from_state(
757 new_state.clone(),
758 );
759 let v = ActiveStateMeta::default().into_inactive();
760 dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
761 Ok(ActiveOrInactiveState::Inactive {
762 dyn_state: new_state,
763 })
764 } else {
765 let k = ActiveStateKey::from_state(
766 new_state.clone(),
767 );
768 let v = ActiveStateMeta::default();
769 dbtx.insert_entry(&ActiveStateKeyDb(k), &v).await;
770 Ok(ActiveOrInactiveState::Active {
771 dyn_state: new_state,
772 meta: v,
773 })
774 }
775 })
776 },
777 None,
778 )
779 .await
780 .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
781
782 debug!(
783 target: LOG_CLIENT_REACTOR,
784 terminal = !outcome.is_active(),
785 ?outcome,
786 "State transition complete",
787 );
788
789 match &outcome {
790 ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
791 sm_update_tx
792 .send(dyn_state.clone())
793 .expect("can't fail: we are the receiving end");
794 notifier.notify(dyn_state.clone());
795 }
796 ActiveOrInactiveState::Inactive { dyn_state } => {
797 notifier.notify(dyn_state.clone());
798 }
799 }
800 ExecutorLoopEvent::Completed { state, outcome }
801 }
802 .instrument(span),
803 )
804 });
805 }
806 ExecutorLoopEvent::Invalid { state } => {
807 trace!(
808 target: LOG_CLIENT_REACTOR,
809 operation_id = %state.operation_id().fmt_short(), total = futures.len(),
810 "State invalid"
811 );
812 assert!(
813 currently_running_sms.remove(&state),
814 "State must have been recorded"
815 );
816 }
817
818 ExecutorLoopEvent::Completed { state, outcome } => {
819 assert!(
820 currently_running_sms.remove(&state),
821 "State must have been recorded"
822 );
823 debug!(
824 target: LOG_CLIENT_REACTOR,
825 operation_id = %state.operation_id().fmt_short(),
826 outcome_active = outcome.is_active(),
827 total = futures.len(),
828 "State transition complete"
829 );
830 trace!(
831 target: LOG_CLIENT_REACTOR,
832 ?outcome,
833 operation_id = %state.operation_id().fmt_short(), total = futures.len(),
834 "State transition complete"
835 );
836 }
837 ExecutorLoopEvent::Disconnected => {
838 break;
839 }
840 }
841 }
842
843 info!(target: LOG_CLIENT_REACTOR, "Terminated.");
844 Ok(())
845 }
846
847 async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
848 self.db
849 .begin_transaction_nc()
850 .await
851 .find_by_prefix(&ActiveStateKeyPrefix)
852 .await
853 .filter(|(state, _)| {
855 future::ready(
856 self.module_contexts
857 .contains_key(&state.0.state.module_instance_id()),
858 )
859 })
860 .map(|(state, meta)| (state.0.state, meta))
861 .collect::<Vec<_>>()
862 .await
863 }
864
865 async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
866 if !self
868 .module_contexts
869 .contains_key(&state.module_instance_id())
870 {
871 return None;
872 }
873 self.db
874 .begin_transaction_nc()
875 .await
876 .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
877 .await
878 }
879
880 async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
881 self.db
882 .begin_transaction_nc()
883 .await
884 .find_by_prefix(&InactiveStateKeyPrefix)
885 .await
886 .filter(|(state, _)| {
888 future::ready(
889 self.module_contexts
890 .contains_key(&state.0.state.module_instance_id()),
891 )
892 })
893 .map(|(state, meta)| (state.0.state, meta))
894 .collect::<Vec<_>>()
895 .await
896 }
897
898 pub async fn log_event_dbtx<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
899 where
900 E: Event + Send,
901 Cap: Send,
902 {
903 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), None, event)
904 .await;
905 }
906}
907
908impl ExecutorInner {
909 fn stop_executor(&self) -> Option<()> {
911 let mut state = self.state.write().expect("Locking can't fail");
912
913 state.stop()
914 }
915}
916
917impl Debug for ExecutorInner {
918 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
919 writeln!(f, "ExecutorInner {{}}")
920 }
921}
922
923impl ExecutorBuilder {
924 pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
927 where
928 C: IntoDynInstance<DynType = DynContext>,
929 {
930 self.with_module_dyn(context.into_dyn(instance_id));
931 }
932
933 pub fn with_module_dyn(&mut self, context: DynContext) {
936 self.valid_module_ids.insert(context.module_instance_id());
937
938 if self
939 .module_contexts
940 .insert(context.module_instance_id(), context)
941 .is_some()
942 {
943 panic!("Tried to add two modules with the same instance id!");
944 }
945 }
946
947 pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
951 self.valid_module_ids.insert(module_id);
952 }
953
954 pub fn build(
958 self,
959 db: Database,
960 notifier: Notifier,
961 client_task_group: TaskGroup,
962 log_ordering_wakeup_tx: watch::Sender<()>,
963 ) -> Executor {
964 let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
965
966 let inner = Arc::new(ExecutorInner {
967 db,
968 log_ordering_wakeup_tx,
969 state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
970 module_contexts: self.module_contexts,
971 valid_module_ids: self.valid_module_ids,
972 notifier,
973 sm_update_tx,
974 client_task_group,
975 });
976
977 debug!(
978 target: LOG_CLIENT_REACTOR,
979 instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
980 "Initialized state machine executor with module instances"
981 );
982 Executor { inner }
983 }
984}
985#[derive(Debug)]
986pub struct ActiveOperationStateKeyPrefix {
987 pub operation_id: OperationId,
988}
989
990impl Encodable for ActiveOperationStateKeyPrefix {
991 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
992 self.operation_id.consensus_encode(writer)
993 }
994}
995
996impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
997 type Record = ActiveStateKeyDb;
998}
999
1000#[derive(Debug)]
1001pub(crate) struct ActiveModuleOperationStateKeyPrefix {
1002 pub operation_id: OperationId,
1003 pub module_instance: ModuleInstanceId,
1004}
1005
1006impl Encodable for ActiveModuleOperationStateKeyPrefix {
1007 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1008 self.operation_id.consensus_encode(writer)?;
1009 self.module_instance.consensus_encode(writer)?;
1010 Ok(())
1011 }
1012}
1013
1014impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
1015 type Record = ActiveStateKeyDb;
1016}
1017
1018#[derive(Debug)]
1019pub struct ActiveStateKeyPrefix;
1020
1021impl Encodable for ActiveStateKeyPrefix {
1022 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1023 Ok(())
1024 }
1025}
1026
1027#[derive(Encodable, Decodable, Debug)]
1028pub struct ActiveStateKeyDb(pub fedimint_client_module::sm::executor::ActiveStateKey);
1029
1030impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyDb {
1031 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1032 const NOTIFY_ON_MODIFY: bool = true;
1033 type Key = Self;
1034 type Value = ActiveStateMeta;
1035}
1036
1037impl DatabaseKeyWithNotify for ActiveStateKeyDb {}
1038
1039impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
1040 type Record = ActiveStateKeyDb;
1041}
1042
1043#[derive(Debug, Encodable, Decodable)]
1044pub struct ActiveStateKeyPrefixBytes;
1045
1046impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
1047 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1048 const NOTIFY_ON_MODIFY: bool = false;
1049 type Key = Self;
1050 type Value = ActiveStateMeta;
1051}
1052
1053impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
1054 type Record = ActiveStateKeyBytes;
1055}
1056
1057#[derive(Encodable, Decodable, Debug)]
1058pub struct InactiveStateKeyDb(pub fedimint_client_module::sm::executor::InactiveStateKey);
1059
1060#[derive(Debug)]
1061pub struct InactiveStateKeyBytes {
1062 pub operation_id: OperationId,
1063 pub module_instance_id: ModuleInstanceId,
1064 pub state: Vec<u8>,
1065}
1066
1067impl Encodable for InactiveStateKeyBytes {
1068 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1069 self.operation_id.consensus_encode(writer)?;
1070 writer.write_all(self.state.as_slice())?;
1071 Ok(())
1072 }
1073}
1074
1075impl Decodable for InactiveStateKeyBytes {
1076 fn consensus_decode_partial<R: std::io::Read>(
1077 reader: &mut R,
1078 modules: &ModuleDecoderRegistry,
1079 ) -> Result<Self, DecodeError> {
1080 let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1081 let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1082 let mut bytes = Vec::new();
1083 reader
1084 .read_to_end(&mut bytes)
1085 .map_err(DecodeError::from_err)?;
1086
1087 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1088 instance_bytes.append(&mut bytes);
1089
1090 Ok(InactiveStateKeyBytes {
1091 operation_id,
1092 module_instance_id,
1093 state: instance_bytes,
1094 })
1095 }
1096}
1097
1098#[derive(Debug)]
1099pub struct InactiveOperationStateKeyPrefix {
1100 pub operation_id: OperationId,
1101}
1102
1103impl Encodable for InactiveOperationStateKeyPrefix {
1104 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1105 self.operation_id.consensus_encode(writer)
1106 }
1107}
1108
1109impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1110 type Record = InactiveStateKeyDb;
1111}
1112
1113#[derive(Debug)]
1114pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1115 pub operation_id: OperationId,
1116 pub module_instance: ModuleInstanceId,
1117}
1118
1119impl Encodable for InactiveModuleOperationStateKeyPrefix {
1120 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1121 self.operation_id.consensus_encode(writer)?;
1122 self.module_instance.consensus_encode(writer)?;
1123 Ok(())
1124 }
1125}
1126
1127impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1128 type Record = InactiveStateKeyDb;
1129}
1130
1131#[derive(Debug, Clone)]
1132pub struct InactiveStateKeyPrefix;
1133
1134impl Encodable for InactiveStateKeyPrefix {
1135 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1136 Ok(())
1137 }
1138}
1139
1140#[derive(Debug, Encodable, Decodable)]
1141pub struct InactiveStateKeyPrefixBytes;
1142
1143impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1144 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1145 const NOTIFY_ON_MODIFY: bool = false;
1146 type Key = Self;
1147 type Value = InactiveStateMeta;
1148}
1149
1150impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1151 type Record = InactiveStateKeyBytes;
1152}
1153
1154#[derive(Debug)]
1155pub struct ActiveStateKeyBytes {
1156 pub operation_id: OperationId,
1157 pub module_instance_id: ModuleInstanceId,
1158 pub state: Vec<u8>,
1159}
1160
1161impl Encodable for ActiveStateKeyBytes {
1162 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1163 self.operation_id.consensus_encode(writer)?;
1164 writer.write_all(self.state.as_slice())?;
1165 Ok(())
1166 }
1167}
1168
1169impl Decodable for ActiveStateKeyBytes {
1170 fn consensus_decode_partial<R: std::io::Read>(
1171 reader: &mut R,
1172 modules: &ModuleDecoderRegistry,
1173 ) -> Result<Self, DecodeError> {
1174 let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1175 let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1176 let mut bytes = Vec::new();
1177 reader
1178 .read_to_end(&mut bytes)
1179 .map_err(DecodeError::from_err)?;
1180
1181 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1182 instance_bytes.append(&mut bytes);
1183
1184 Ok(ActiveStateKeyBytes {
1185 operation_id,
1186 module_instance_id,
1187 state: instance_bytes,
1188 })
1189 }
1190}
1191impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyDb {
1192 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1193 const NOTIFY_ON_MODIFY: bool = true;
1194 type Key = Self;
1195 type Value = InactiveStateMeta;
1196}
1197
1198impl DatabaseKeyWithNotify for InactiveStateKeyDb {}
1199
1200impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1201 type Record = InactiveStateKeyDb;
1202}
1203
1204#[derive(Debug)]
1205enum ActiveOrInactiveState {
1206 Active {
1207 dyn_state: DynState,
1208 #[allow(dead_code)] meta: ActiveStateMeta,
1210 },
1211 Inactive {
1212 dyn_state: DynState,
1213 },
1214}
1215
1216impl ActiveOrInactiveState {
1217 fn is_active(&self) -> bool {
1218 match self {
1219 ActiveOrInactiveState::Active { .. } => true,
1220 ActiveOrInactiveState::Inactive { .. } => false,
1221 }
1222 }
1223}
1224
1225#[apply(async_trait_maybe_send!)]
1226impl IExecutor for Executor {
1227 async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
1228 Self::get_active_states(self).await
1229 }
1230
1231 async fn add_state_machines_dbtx(
1232 &self,
1233 dbtx: &mut DatabaseTransaction<'_>,
1234 states: Vec<DynState>,
1235 ) -> AddStateMachinesResult {
1236 Self::add_state_machines_dbtx(self, dbtx, states).await
1237 }
1238}
1239
1240#[cfg(test)]
1241mod tests;