1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Write};
5use std::mem;
6use std::sync::Arc;
7
8use anyhow::anyhow;
9use fedimint_client_module::sm::executor::{
10 ActiveStateKey, ContextGen, IExecutor, InactiveStateKey,
11};
12use fedimint_client_module::sm::{
13 ActiveStateMeta, ClientSMDatabaseTransaction, DynContext, DynState, InactiveStateMeta, State,
14 StateTransition, StateTransitionFunction,
15};
16use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
17use fedimint_core::db::{
18 AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
19 IDatabaseTransactionOpsCoreTyped,
20};
21use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
22use fedimint_core::fmt_utils::AbbreviateJson;
23use fedimint_core::module::registry::ModuleDecoderRegistry;
24use fedimint_core::task::TaskGroup;
25use fedimint_core::util::{BoxFuture, FmtCompactAnyhow as _};
26use fedimint_core::{apply, async_trait_maybe_send};
27use fedimint_logging::LOG_CLIENT_REACTOR;
28use futures::future::{self, select_all};
29use futures::stream::{FuturesUnordered, StreamExt};
30use tokio::select;
31use tokio::sync::{mpsc, oneshot};
32use tracing::{Instrument, debug, error, info, trace, warn};
33
34use super::notifier::Notifier;
35use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
36
37const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
39
40pub(crate) enum ExecutorDbPrefixes {
42 ActiveStates = 0xa1,
44 InactiveStates = 0xa2,
46}
47
48#[derive(Clone, Debug)]
56pub struct Executor {
57 inner: Arc<ExecutorInner>,
58}
59
60struct ExecutorInner {
61 db: Database,
62 state: std::sync::RwLock<ExecutorState>,
63 module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
64 valid_module_ids: BTreeSet<ModuleInstanceId>,
65 notifier: Notifier,
66 sm_update_tx: mpsc::UnboundedSender<DynState>,
69 client_task_group: TaskGroup,
70}
71
72enum ExecutorState {
73 Unstarted {
74 sm_update_rx: mpsc::UnboundedReceiver<DynState>,
75 },
76 Running {
77 context_gen: ContextGen,
78 shutdown_sender: oneshot::Sender<()>,
79 },
80 Stopped,
81}
82
83impl ExecutorState {
84 fn start(
88 &mut self,
89 context: ContextGen,
90 ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
91 let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
92
93 let previous_state = mem::replace(
94 self,
95 ExecutorState::Running {
96 context_gen: context,
97 shutdown_sender,
98 },
99 );
100
101 match previous_state {
102 ExecutorState::Unstarted { sm_update_rx } => Some((shutdown_receiver, sm_update_rx)),
103 _ => {
104 *self = previous_state;
106
107 debug!(target: LOG_CLIENT_REACTOR, "Executor already started, ignoring start request");
108 None
109 }
110 }
111 }
112
113 fn stop(&mut self) -> Option<()> {
116 let previous_state = mem::replace(self, ExecutorState::Stopped);
117
118 match previous_state {
119 ExecutorState::Running {
120 shutdown_sender, ..
121 } => {
122 if shutdown_sender.send(()).is_err() {
123 warn!(target: LOG_CLIENT_REACTOR, "Failed to send shutdown signal to executor, already dead?");
124 }
125 Some(())
126 }
127 _ => {
128 *self = previous_state;
130
131 debug!(target: LOG_CLIENT_REACTOR, "Executor not running, ignoring stop request");
132 None
133 }
134 }
135 }
136
137 fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
138 let ExecutorState::Running { context_gen, .. } = self else {
139 return None;
140 };
141 Some(context_gen(
142 state.module_instance_id(),
143 state.operation_id(),
144 ))
145 }
146}
147
148#[derive(Debug, Default)]
151pub struct ExecutorBuilder {
152 module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
153 valid_module_ids: BTreeSet<ModuleInstanceId>,
154}
155
156impl Executor {
157 pub fn builder() -> ExecutorBuilder {
159 ExecutorBuilder::default()
160 }
161
162 pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
163 self.inner.get_active_states().await
164 }
165
166 pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
172 self.inner
173 .db
174 .autocommit(
175 |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
176 MAX_DB_ATTEMPTS,
177 )
178 .await
179 .map_err(|e| match e {
180 AutocommitError::CommitFailed {
181 last_error,
182 attempts,
183 } => last_error.context(format!("Failed to commit after {attempts} attempts")),
184 AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
185 })?;
186
187 Ok(())
190 }
191
192 pub async fn add_state_machines_dbtx(
201 &self,
202 dbtx: &mut DatabaseTransaction<'_>,
203 states: Vec<DynState>,
204 ) -> AddStateMachinesResult {
205 for state in states {
206 if !self
207 .inner
208 .valid_module_ids
209 .contains(&state.module_instance_id())
210 {
211 return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
212 }
213
214 let is_active_state = dbtx
215 .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
216 .await
217 .is_some();
218 let is_inactive_state = dbtx
219 .get_value(&InactiveStateKeyDb(InactiveStateKey::from_state(
220 state.clone(),
221 )))
222 .await
223 .is_some();
224
225 if is_active_state || is_inactive_state {
226 return Err(AddStateMachinesError::StateAlreadyExists);
227 }
228
229 if let Some(module_context) =
234 self.inner.module_contexts.get(&state.module_instance_id())
235 {
236 match self
237 .inner
238 .state
239 .read()
240 .expect("locking failed")
241 .gen_context(&state)
242 {
243 Some(context) => {
244 if state.is_terminal(module_context, &context) {
245 return Err(AddStateMachinesError::Other(anyhow!(
246 "State is already terminal, adding it to the executor doesn't make sense."
247 )));
248 }
249 }
250 _ => {
251 warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
252 }
253 }
254 }
255
256 dbtx.insert_new_entry(
257 &ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())),
258 &ActiveStateMeta::default(),
259 )
260 .await;
261
262 let notify_sender = self.inner.notifier.sender();
263 let sm_updates_tx = self.inner.sm_update_tx.clone();
264 dbtx.on_commit(move || {
265 notify_sender.notify(state.clone());
266 let _ = sm_updates_tx.send(state);
267 });
268 }
269
270 Ok(())
271 }
272
273 pub async fn contains_active_state<S: State>(
278 &self,
279 instance: ModuleInstanceId,
280 state: S,
281 ) -> bool {
282 let state = DynState::from_typed(instance, state);
283 self.inner
284 .get_active_states()
285 .await
286 .into_iter()
287 .any(|(s, _)| s == state)
288 }
289
290 pub async fn contains_inactive_state<S: State>(
298 &self,
299 instance: ModuleInstanceId,
300 state: S,
301 ) -> bool {
302 let state = DynState::from_typed(instance, state);
303 self.inner
304 .get_inactive_states()
305 .await
306 .into_iter()
307 .any(|(s, _)| s == state)
308 }
309
310 pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
311 self.inner
312 .db
313 .wait_key_exists(&InactiveStateKeyDb(InactiveStateKey::from_state(state)))
314 .await
315 }
316
317 pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
318 self.inner
319 .db
320 .wait_key_exists(&ActiveStateKeyDb(ActiveStateKey::from_state(state)))
321 .await
322 }
323
324 pub async fn get_operation_states(
326 &self,
327 operation_id: OperationId,
328 ) -> (
329 Vec<(DynState, ActiveStateMeta)>,
330 Vec<(DynState, InactiveStateMeta)>,
331 ) {
332 let mut dbtx = self.inner.db.begin_transaction_nc().await;
333 let active_states: Vec<_> = dbtx
334 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
335 .await
336 .map(|(active_key, active_meta)| (active_key.0.state, active_meta))
337 .collect()
338 .await;
339 let inactive_states: Vec<_> = dbtx
340 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
341 .await
342 .map(|(active_key, inactive_meta)| (active_key.0.state, inactive_meta))
343 .collect()
344 .await;
345
346 (active_states, inactive_states)
347 }
348
349 pub fn start_executor(&self, context_gen: ContextGen) {
356 let Some((shutdown_receiver, sm_update_rx)) = self
357 .inner
358 .state
359 .write()
360 .expect("locking can't fail")
361 .start(context_gen.clone())
362 else {
363 panic!("start_executor was called previously");
364 };
365
366 let task_runner_inner = self.inner.clone();
367 let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
368 let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
369 let task_group_shutdown_rx = task_handle.make_shutdown_rx();
370 select! {
371 () = task_group_shutdown_rx => {
372 debug!(
373 target: LOG_CLIENT_REACTOR,
374 "Shutting down state machine executor runner due to task group shutdown signal"
375 );
376 },
377 shutdown_happened_sender = shutdown_receiver => {
378 match shutdown_happened_sender {
379 Ok(()) => {
380 debug!(
381 target: LOG_CLIENT_REACTOR,
382 "Shutting down state machine executor runner due to explicit shutdown signal"
383 );
384 },
385 Err(_) => {
386 warn!(
387 target: LOG_CLIENT_REACTOR,
388 "Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)"
389 );
390 }
391 }
392 },
393 () = executor_runner => {
394 error!(target: LOG_CLIENT_REACTOR, "State machine executor runner exited unexpectedly!");
395 },
396 };
397 });
398 }
399
400 pub fn stop_executor(&self) -> Option<()> {
414 self.inner.stop_executor()
415 }
416
417 pub fn notifier(&self) -> &Notifier {
420 &self.inner.notifier
421 }
422}
423
424impl Drop for ExecutorInner {
425 fn drop(&mut self) {
426 self.stop_executor();
427 }
428}
429
430struct TransitionForActiveState {
431 outcome: serde_json::Value,
432 state: DynState,
433 meta: ActiveStateMeta,
434 transition_fn: StateTransitionFunction<DynState>,
435}
436
437impl ExecutorInner {
438 async fn run(
439 &self,
440 global_context_gen: ContextGen,
441 sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
442 ) {
443 debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
444 if let Err(err) = self
445 .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
446 .await
447 {
448 warn!(
449 target: LOG_CLIENT_REACTOR,
450 err = %err.fmt_compact_anyhow(),
451 "An unexpected error occurred during a state transition"
452 );
453 }
454 }
455
456 async fn get_transition_for(
457 &self,
458 state: &DynState,
459 meta: ActiveStateMeta,
460 global_context_gen: &ContextGen,
461 ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
462 let module_instance = state.module_instance_id();
463 let context = &self
464 .module_contexts
465 .get(&module_instance)
466 .expect("Unknown module");
467 let transitions = state
468 .transitions(
469 context,
470 &global_context_gen(module_instance, state.operation_id()),
471 )
472 .into_iter()
473 .map(|transition| {
474 let state = state.clone();
475 let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
476 let StateTransition {
477 trigger,
478 transition,
479 } = transition;
480 TransitionForActiveState {
481 outcome: trigger.await,
482 state,
483 transition_fn: transition,
484 meta,
485 }
486 });
487 f
488 })
489 .collect::<Vec<_>>();
490 if transitions.is_empty() {
491 warn!(
495 target: LOG_CLIENT_REACTOR,
496 module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream."
497 );
498 self.db
499 .autocommit::<_, _, anyhow::Error>(
500 |dbtx, _| {
501 Box::pin(async {
502 let k = InactiveStateKey::from_state(state.clone());
503 let v = ActiveStateMeta::default().into_inactive();
504 dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
505 state.clone(),
506 )))
507 .await;
508 dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
509 Ok(())
510 })
511 },
512 None,
513 )
514 .await
515 .expect("Autocommit here can't fail");
516 }
517
518 transitions
519 }
520
521 async fn run_state_machines_executor_inner(
522 &self,
523 global_context_gen: ContextGen,
524 mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
525 ) -> anyhow::Result<()> {
526 enum ExecutorLoopEvent {
529 New { state: DynState },
532 Triggered(TransitionForActiveState),
535 Invalid { state: DynState },
537 Completed {
539 state: DynState,
540 outcome: ActiveOrInactiveState,
541 },
542 Disconnected,
544 }
545
546 let active_states = self.get_active_states().await;
547 trace!(target: LOG_CLIENT_REACTOR, "Starting active states: {:?}", active_states);
548 for (state, _meta) in active_states {
549 self.sm_update_tx
550 .send(state)
551 .expect("Must be able to send state machine to own opened channel");
552 }
553
554 let mut currently_running_sms = HashSet::<DynState>::new();
557 let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
565 FuturesUnordered::new();
566
567 loop {
568 let event = tokio::select! {
569 new = sm_update_rx.recv() => {
570 match new { Some(new) => {
571 ExecutorLoopEvent::New {
572 state: new,
573 }
574 } _ => {
575 ExecutorLoopEvent::Disconnected
576 }}
577 },
578
579 event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
580 };
581
582 match event {
585 ExecutorLoopEvent::New { state } => {
586 if currently_running_sms.contains(&state) {
587 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
588 continue;
589 }
590 currently_running_sms.insert(state.clone());
591 let futures_len = futures.len();
592 let global_context_gen = &global_context_gen;
593 trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
594 futures.push(Box::pin(async move {
595 let Some(meta) = self.get_active_state(&state).await else {
596 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
597 return ExecutorLoopEvent::Invalid { state: state.clone() };
598 };
599
600 let transitions = self
601 .get_transition_for(&state, meta, global_context_gen)
602 .await;
603 if transitions.is_empty() {
604 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
605 return ExecutorLoopEvent::Invalid { state: state.clone() };
606 }
607 let transitions_num = transitions.len();
608
609 debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
610
611 let (first_completed_result, _index, _unused_transitions) =
612 select_all(transitions).await;
613 ExecutorLoopEvent::Triggered(first_completed_result)
614 }));
615 }
616 ExecutorLoopEvent::Triggered(TransitionForActiveState {
617 outcome,
618 state,
619 meta,
620 transition_fn,
621 }) => {
622 debug!(
623 target: LOG_CLIENT_REACTOR,
624 operation_id = %state.operation_id().fmt_short(),
625 "Triggered state transition",
626 );
627 let span = tracing::debug_span!(
628 target: LOG_CLIENT_REACTOR,
629 "sm_transition",
630 operation_id = %state.operation_id().fmt_short()
631 );
632 futures.push({
638 let sm_update_tx = self.sm_update_tx.clone();
639 let db = self.db.clone();
640 let notifier = self.notifier.clone();
641 let module_contexts = self.module_contexts.clone();
642 let global_context_gen = global_context_gen.clone();
643 Box::pin(
644 async move {
645 debug!(
646 target: LOG_CLIENT_REACTOR,
647 "Executing state transition",
648 );
649 trace!(
650 target: LOG_CLIENT_REACTOR,
651 ?state,
652 outcome = ?AbbreviateJson(&outcome),
653 "Executing state transition (details)",
654 );
655
656 let module_contexts = &module_contexts;
657 let global_context_gen = &global_context_gen;
658
659 let outcome = db
660 .autocommit::<'_, '_, _, _, Infallible>(
661 |dbtx, _| {
662 let state = state.clone();
663 let transition_fn = transition_fn.clone();
664 let transition_outcome = outcome.clone();
665 Box::pin(async move {
666 let new_state = transition_fn(
667 &mut ClientSMDatabaseTransaction::new(
668 &mut dbtx.to_ref(),
669 state.module_instance_id(),
670 ),
671 transition_outcome.clone(),
672 state.clone(),
673 )
674 .await;
675 dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
676 state.clone(),
677 )))
678 .await;
679 dbtx.insert_entry(
680 &InactiveStateKeyDb(InactiveStateKey::from_state(state.clone())),
681 &meta.into_inactive(),
682 )
683 .await;
684
685 let context = &module_contexts
686 .get(&state.module_instance_id())
687 .expect("Unknown module");
688
689 let operation_id = state.operation_id();
690 let global_context = global_context_gen(
691 state.module_instance_id(),
692 operation_id,
693 );
694
695 let is_terminal = new_state.is_terminal(context, &global_context);
696
697 if is_terminal {
698 let k = InactiveStateKey::from_state(
699 new_state.clone(),
700 );
701 let v = ActiveStateMeta::default().into_inactive();
702 dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
703 Ok(ActiveOrInactiveState::Inactive {
704 dyn_state: new_state,
705 })
706 } else {
707 let k = ActiveStateKey::from_state(
708 new_state.clone(),
709 );
710 let v = ActiveStateMeta::default();
711 dbtx.insert_entry(&ActiveStateKeyDb(k), &v).await;
712 Ok(ActiveOrInactiveState::Active {
713 dyn_state: new_state,
714 meta: v,
715 })
716 }
717 })
718 },
719 None,
720 )
721 .await
722 .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
723
724 debug!(
725 target: LOG_CLIENT_REACTOR,
726 terminal = !outcome.is_active(),
727 ?outcome,
728 "State transition complete",
729 );
730
731 match &outcome {
732 ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
733 sm_update_tx
734 .send(dyn_state.clone())
735 .expect("can't fail: we are the receiving end");
736 notifier.notify(dyn_state.clone());
737 }
738 ActiveOrInactiveState::Inactive { dyn_state } => {
739 notifier.notify(dyn_state.clone());
740 }
741 }
742 ExecutorLoopEvent::Completed { state, outcome }
743 }
744 .instrument(span),
745 )
746 });
747 }
748 ExecutorLoopEvent::Invalid { state } => {
749 trace!(
750 target: LOG_CLIENT_REACTOR,
751 operation_id = %state.operation_id().fmt_short(), total = futures.len(),
752 "State invalid"
753 );
754 assert!(
755 currently_running_sms.remove(&state),
756 "State must have been recorded"
757 );
758 }
759
760 ExecutorLoopEvent::Completed { state, outcome } => {
761 assert!(
762 currently_running_sms.remove(&state),
763 "State must have been recorded"
764 );
765 debug!(
766 target: LOG_CLIENT_REACTOR,
767 operation_id = %state.operation_id().fmt_short(),
768 outcome_active = outcome.is_active(),
769 total = futures.len(),
770 "State transition complete"
771 );
772 trace!(
773 target: LOG_CLIENT_REACTOR,
774 ?outcome,
775 operation_id = %state.operation_id().fmt_short(), total = futures.len(),
776 "State transition complete"
777 );
778 }
779 ExecutorLoopEvent::Disconnected => {
780 break;
781 }
782 }
783 }
784
785 info!(target: LOG_CLIENT_REACTOR, "Terminated.");
786 Ok(())
787 }
788
789 async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
790 self.db
791 .begin_transaction_nc()
792 .await
793 .find_by_prefix(&ActiveStateKeyPrefix)
794 .await
795 .filter(|(state, _)| {
797 future::ready(
798 self.module_contexts
799 .contains_key(&state.0.state.module_instance_id()),
800 )
801 })
802 .map(|(state, meta)| (state.0.state, meta))
803 .collect::<Vec<_>>()
804 .await
805 }
806
807 async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
808 if !self
810 .module_contexts
811 .contains_key(&state.module_instance_id())
812 {
813 return None;
814 }
815 self.db
816 .begin_transaction_nc()
817 .await
818 .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
819 .await
820 }
821
822 async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
823 self.db
824 .begin_transaction_nc()
825 .await
826 .find_by_prefix(&InactiveStateKeyPrefix)
827 .await
828 .filter(|(state, _)| {
830 future::ready(
831 self.module_contexts
832 .contains_key(&state.0.state.module_instance_id()),
833 )
834 })
835 .map(|(state, meta)| (state.0.state, meta))
836 .collect::<Vec<_>>()
837 .await
838 }
839}
840
841impl ExecutorInner {
842 fn stop_executor(&self) -> Option<()> {
844 let mut state = self.state.write().expect("Locking can't fail");
845
846 state.stop()
847 }
848}
849
850impl Debug for ExecutorInner {
851 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
852 writeln!(f, "ExecutorInner {{}}")
853 }
854}
855
856impl ExecutorBuilder {
857 pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
860 where
861 C: IntoDynInstance<DynType = DynContext>,
862 {
863 self.with_module_dyn(context.into_dyn(instance_id));
864 }
865
866 pub fn with_module_dyn(&mut self, context: DynContext) {
869 self.valid_module_ids.insert(context.module_instance_id());
870
871 if self
872 .module_contexts
873 .insert(context.module_instance_id(), context)
874 .is_some()
875 {
876 panic!("Tried to add two modules with the same instance id!");
877 }
878 }
879
880 pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
884 self.valid_module_ids.insert(module_id);
885 }
886
887 pub fn build(self, db: Database, notifier: Notifier, client_task_group: TaskGroup) -> Executor {
891 let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
892
893 let inner = Arc::new(ExecutorInner {
894 db,
895 state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
896 module_contexts: self.module_contexts,
897 valid_module_ids: self.valid_module_ids,
898 notifier,
899 sm_update_tx,
900 client_task_group,
901 });
902
903 debug!(
904 target: LOG_CLIENT_REACTOR,
905 instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
906 "Initialized state machine executor with module instances"
907 );
908 Executor { inner }
909 }
910}
911#[derive(Debug)]
912pub struct ActiveOperationStateKeyPrefix {
913 pub operation_id: OperationId,
914}
915
916impl Encodable for ActiveOperationStateKeyPrefix {
917 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
918 self.operation_id.consensus_encode(writer)
919 }
920}
921
922impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
923 type Record = ActiveStateKeyDb;
924}
925
926#[derive(Debug)]
927pub(crate) struct ActiveModuleOperationStateKeyPrefix {
928 pub operation_id: OperationId,
929 pub module_instance: ModuleInstanceId,
930}
931
932impl Encodable for ActiveModuleOperationStateKeyPrefix {
933 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
934 self.operation_id.consensus_encode(writer)?;
935 self.module_instance.consensus_encode(writer)?;
936 Ok(())
937 }
938}
939
940impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
941 type Record = ActiveStateKeyDb;
942}
943
944#[derive(Debug)]
945pub struct ActiveStateKeyPrefix;
946
947impl Encodable for ActiveStateKeyPrefix {
948 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
949 Ok(())
950 }
951}
952
953#[derive(Encodable, Decodable, Debug)]
954pub struct ActiveStateKeyDb(pub fedimint_client_module::sm::executor::ActiveStateKey);
955
956impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyDb {
957 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
958 const NOTIFY_ON_MODIFY: bool = true;
959 type Key = Self;
960 type Value = ActiveStateMeta;
961}
962
963impl DatabaseKeyWithNotify for ActiveStateKeyDb {}
964
965impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
966 type Record = ActiveStateKeyDb;
967}
968
969#[derive(Debug, Encodable, Decodable)]
970pub struct ActiveStateKeyPrefixBytes;
971
972impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
973 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
974 const NOTIFY_ON_MODIFY: bool = false;
975 type Key = Self;
976 type Value = ActiveStateMeta;
977}
978
979impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
980 type Record = ActiveStateKeyBytes;
981}
982
983#[derive(Encodable, Decodable, Debug)]
984pub struct InactiveStateKeyDb(pub fedimint_client_module::sm::executor::InactiveStateKey);
985
986#[derive(Debug)]
987pub struct InactiveStateKeyBytes {
988 pub operation_id: OperationId,
989 pub module_instance_id: ModuleInstanceId,
990 pub state: Vec<u8>,
991}
992
993impl Encodable for InactiveStateKeyBytes {
994 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
995 self.operation_id.consensus_encode(writer)?;
996 writer.write_all(self.state.as_slice())?;
997 Ok(())
998 }
999}
1000
1001impl Decodable for InactiveStateKeyBytes {
1002 fn consensus_decode_partial<R: std::io::Read>(
1003 reader: &mut R,
1004 modules: &ModuleDecoderRegistry,
1005 ) -> Result<Self, DecodeError> {
1006 let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1007 let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1008 let mut bytes = Vec::new();
1009 reader
1010 .read_to_end(&mut bytes)
1011 .map_err(DecodeError::from_err)?;
1012
1013 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1014 instance_bytes.append(&mut bytes);
1015
1016 Ok(InactiveStateKeyBytes {
1017 operation_id,
1018 module_instance_id,
1019 state: instance_bytes,
1020 })
1021 }
1022}
1023
1024#[derive(Debug)]
1025pub struct InactiveOperationStateKeyPrefix {
1026 pub operation_id: OperationId,
1027}
1028
1029impl Encodable for InactiveOperationStateKeyPrefix {
1030 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1031 self.operation_id.consensus_encode(writer)
1032 }
1033}
1034
1035impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1036 type Record = InactiveStateKeyDb;
1037}
1038
1039#[derive(Debug)]
1040pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1041 pub operation_id: OperationId,
1042 pub module_instance: ModuleInstanceId,
1043}
1044
1045impl Encodable for InactiveModuleOperationStateKeyPrefix {
1046 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1047 self.operation_id.consensus_encode(writer)?;
1048 self.module_instance.consensus_encode(writer)?;
1049 Ok(())
1050 }
1051}
1052
1053impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1054 type Record = InactiveStateKeyDb;
1055}
1056
1057#[derive(Debug, Clone)]
1058pub struct InactiveStateKeyPrefix;
1059
1060impl Encodable for InactiveStateKeyPrefix {
1061 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1062 Ok(())
1063 }
1064}
1065
1066#[derive(Debug, Encodable, Decodable)]
1067pub struct InactiveStateKeyPrefixBytes;
1068
1069impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1070 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1071 const NOTIFY_ON_MODIFY: bool = false;
1072 type Key = Self;
1073 type Value = InactiveStateMeta;
1074}
1075
1076impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1077 type Record = InactiveStateKeyBytes;
1078}
1079
1080#[derive(Debug)]
1081pub struct ActiveStateKeyBytes {
1082 pub operation_id: OperationId,
1083 pub module_instance_id: ModuleInstanceId,
1084 pub state: Vec<u8>,
1085}
1086
1087impl Encodable for ActiveStateKeyBytes {
1088 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1089 self.operation_id.consensus_encode(writer)?;
1090 writer.write_all(self.state.as_slice())?;
1091 Ok(())
1092 }
1093}
1094
1095impl Decodable for ActiveStateKeyBytes {
1096 fn consensus_decode_partial<R: std::io::Read>(
1097 reader: &mut R,
1098 modules: &ModuleDecoderRegistry,
1099 ) -> Result<Self, DecodeError> {
1100 let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1101 let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1102 let mut bytes = Vec::new();
1103 reader
1104 .read_to_end(&mut bytes)
1105 .map_err(DecodeError::from_err)?;
1106
1107 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1108 instance_bytes.append(&mut bytes);
1109
1110 Ok(ActiveStateKeyBytes {
1111 operation_id,
1112 module_instance_id,
1113 state: instance_bytes,
1114 })
1115 }
1116}
1117impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyDb {
1118 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1119 const NOTIFY_ON_MODIFY: bool = true;
1120 type Key = Self;
1121 type Value = InactiveStateMeta;
1122}
1123
1124impl DatabaseKeyWithNotify for InactiveStateKeyDb {}
1125
1126impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1127 type Record = InactiveStateKeyDb;
1128}
1129
1130#[derive(Debug)]
1131enum ActiveOrInactiveState {
1132 Active {
1133 dyn_state: DynState,
1134 #[allow(dead_code)] meta: ActiveStateMeta,
1136 },
1137 Inactive {
1138 dyn_state: DynState,
1139 },
1140}
1141
1142impl ActiveOrInactiveState {
1143 fn is_active(&self) -> bool {
1144 match self {
1145 ActiveOrInactiveState::Active { .. } => true,
1146 ActiveOrInactiveState::Inactive { .. } => false,
1147 }
1148 }
1149}
1150
1151#[apply(async_trait_maybe_send!)]
1152impl IExecutor for Executor {
1153 async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
1154 Self::get_active_states(self).await
1155 }
1156
1157 async fn add_state_machines_dbtx(
1158 &self,
1159 dbtx: &mut DatabaseTransaction<'_>,
1160 states: Vec<DynState>,
1161 ) -> AddStateMachinesResult {
1162 Self::add_state_machines_dbtx(self, dbtx, states).await
1163 }
1164}
1165
1166#[cfg(test)]
1167mod tests;