1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Write};
5use std::mem;
6use std::sync::Arc;
7
8use anyhow::anyhow;
9use fedimint_client_module::sm::executor::{
10 ActiveStateKey, ContextGen, IExecutor, InactiveStateKey,
11};
12use fedimint_client_module::sm::{
13 ActiveStateMeta, ClientSMDatabaseTransaction, DynContext, DynState, InactiveStateMeta, State,
14 StateTransition, StateTransitionFunction,
15};
16use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
17use fedimint_core::db::{
18 AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
19 IDatabaseTransactionOpsCoreTyped,
20};
21use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
22use fedimint_core::fmt_utils::AbbreviateJson;
23use fedimint_core::module::registry::ModuleDecoderRegistry;
24use fedimint_core::task::TaskGroup;
25use fedimint_core::util::{BoxFuture, FmtCompactAnyhow as _};
26use fedimint_core::{apply, async_trait_maybe_send};
27use fedimint_eventlog::{DBTransactionEventLogExt as _, Event, EventKind, EventPersistence};
28use fedimint_logging::LOG_CLIENT_REACTOR;
29use futures::future::{self, select_all};
30use futures::stream::{FuturesUnordered, StreamExt};
31use serde::{Deserialize, Serialize};
32use tokio::select;
33use tokio::sync::{mpsc, oneshot, watch};
34use tracing::{Instrument, debug, error, info, trace, warn};
35
36use crate::sm::notifier::Notifier;
37use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
38
39const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
41
42pub(crate) enum ExecutorDbPrefixes {
44 ActiveStates = 0xa1,
46 InactiveStates = 0xa2,
48}
49
50#[derive(Serialize, Deserialize)]
51pub struct StateMachineUpdated {
52 operation_id: OperationId,
53 started: bool,
54 terminal: bool,
55 module_id: ModuleInstanceId,
56}
57
58impl Event for StateMachineUpdated {
59 const MODULE: Option<fedimint_core::core::ModuleKind> = None;
60 const KIND: EventKind = EventKind::from_static("sm-updated");
61 const PERSISTENCE: EventPersistence = EventPersistence::Trimable;
62}
63
64#[derive(Clone, Debug)]
72pub struct Executor {
73 inner: Arc<ExecutorInner>,
74}
75
76struct ExecutorInner {
77 db: Database,
78 state: std::sync::RwLock<ExecutorState>,
79 module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
80 valid_module_ids: BTreeSet<ModuleInstanceId>,
81 notifier: Notifier,
82 sm_update_tx: mpsc::UnboundedSender<DynState>,
85 client_task_group: TaskGroup,
86 log_ordering_wakeup_tx: watch::Sender<()>,
87}
88
89enum ExecutorState {
90 Unstarted {
91 sm_update_rx: mpsc::UnboundedReceiver<DynState>,
92 },
93 Running {
94 context_gen: ContextGen,
95 shutdown_sender: oneshot::Sender<()>,
96 },
97 Stopped,
98}
99
100impl ExecutorState {
101 fn start(
105 &mut self,
106 context: ContextGen,
107 ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
108 let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
109
110 let previous_state = mem::replace(
111 self,
112 ExecutorState::Running {
113 context_gen: context,
114 shutdown_sender,
115 },
116 );
117
118 match previous_state {
119 ExecutorState::Unstarted { sm_update_rx } => Some((shutdown_receiver, sm_update_rx)),
120 _ => {
121 *self = previous_state;
123
124 debug!(target: LOG_CLIENT_REACTOR, "Executor already started, ignoring start request");
125 None
126 }
127 }
128 }
129
130 fn stop(&mut self) -> Option<()> {
133 let previous_state = mem::replace(self, ExecutorState::Stopped);
134
135 match previous_state {
136 ExecutorState::Running {
137 shutdown_sender, ..
138 } => {
139 if shutdown_sender.send(()).is_err() {
140 warn!(target: LOG_CLIENT_REACTOR, "Failed to send shutdown signal to executor, already dead?");
141 }
142 Some(())
143 }
144 _ => {
145 *self = previous_state;
147
148 debug!(target: LOG_CLIENT_REACTOR, "Executor not running, ignoring stop request");
149 None
150 }
151 }
152 }
153
154 fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
155 let ExecutorState::Running { context_gen, .. } = self else {
156 return None;
157 };
158 Some(context_gen(
159 state.module_instance_id(),
160 state.operation_id(),
161 ))
162 }
163}
164
165#[derive(Debug, Default)]
168pub struct ExecutorBuilder {
169 module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
170 valid_module_ids: BTreeSet<ModuleInstanceId>,
171}
172
173impl Executor {
174 pub fn builder() -> ExecutorBuilder {
176 ExecutorBuilder::default()
177 }
178
179 pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
180 self.inner.get_active_states().await
181 }
182
183 pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
189 self.inner
190 .db
191 .autocommit(
192 |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
193 MAX_DB_ATTEMPTS,
194 )
195 .await
196 .map_err(|e| match e {
197 AutocommitError::CommitFailed {
198 last_error,
199 attempts,
200 } => last_error.context(format!("Failed to commit after {attempts} attempts")),
201 AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
202 })?;
203
204 Ok(())
207 }
208
209 pub async fn add_state_machines_dbtx(
218 &self,
219 dbtx: &mut DatabaseTransaction<'_>,
220 states: Vec<DynState>,
221 ) -> AddStateMachinesResult {
222 for state in states {
223 if !self
224 .inner
225 .valid_module_ids
226 .contains(&state.module_instance_id())
227 {
228 return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
229 }
230
231 let is_active_state = dbtx
232 .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
233 .await
234 .is_some();
235 let is_inactive_state = dbtx
236 .get_value(&InactiveStateKeyDb(InactiveStateKey::from_state(
237 state.clone(),
238 )))
239 .await
240 .is_some();
241
242 if is_active_state || is_inactive_state {
243 return Err(AddStateMachinesError::StateAlreadyExists);
244 }
245
246 if let Some(module_context) =
251 self.inner.module_contexts.get(&state.module_instance_id())
252 {
253 match self
254 .inner
255 .state
256 .read()
257 .expect("locking failed")
258 .gen_context(&state)
259 {
260 Some(context) => {
261 if state.is_terminal(module_context, &context) {
262 return Err(AddStateMachinesError::Other(anyhow!(
263 "State is already terminal, adding it to the executor doesn't make sense."
264 )));
265 }
266 }
267 _ => {
268 warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
269 }
270 }
271 }
272
273 dbtx.insert_new_entry(
274 &ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())),
275 &ActiveStateMeta::default(),
276 )
277 .await;
278
279 let operation_id = state.operation_id();
280 self.inner
281 .log_event_dbtx(
282 dbtx,
283 StateMachineUpdated {
284 operation_id,
285 started: true,
286 terminal: false,
287 module_id: state.module_instance_id(),
288 },
289 )
290 .await;
291
292 let notify_sender = self.inner.notifier.sender();
293 let sm_updates_tx = self.inner.sm_update_tx.clone();
294 dbtx.on_commit(move || {
295 notify_sender.notify(state.clone());
296 let _ = sm_updates_tx.send(state);
297 });
298 }
299
300 Ok(())
301 }
302
303 pub async fn contains_active_state<S: State>(
308 &self,
309 instance: ModuleInstanceId,
310 state: S,
311 ) -> bool {
312 let state = DynState::from_typed(instance, state);
313 self.inner
314 .get_active_states()
315 .await
316 .into_iter()
317 .any(|(s, _)| s == state)
318 }
319
320 pub async fn contains_inactive_state<S: State>(
328 &self,
329 instance: ModuleInstanceId,
330 state: S,
331 ) -> bool {
332 let state = DynState::from_typed(instance, state);
333 self.inner
334 .get_inactive_states()
335 .await
336 .into_iter()
337 .any(|(s, _)| s == state)
338 }
339
340 pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
341 self.inner
342 .db
343 .wait_key_exists(&InactiveStateKeyDb(InactiveStateKey::from_state(state)))
344 .await
345 }
346
347 pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
348 self.inner
349 .db
350 .wait_key_exists(&ActiveStateKeyDb(ActiveStateKey::from_state(state)))
351 .await
352 }
353
354 pub async fn get_operation_states(
356 &self,
357 operation_id: OperationId,
358 ) -> (
359 Vec<(DynState, ActiveStateMeta)>,
360 Vec<(DynState, InactiveStateMeta)>,
361 ) {
362 let mut dbtx = self.inner.db.begin_transaction_nc().await;
363 let active_states: Vec<_> = dbtx
364 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
365 .await
366 .map(|(active_key, active_meta)| (active_key.0.state, active_meta))
367 .collect()
368 .await;
369 let inactive_states: Vec<_> = dbtx
370 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
371 .await
372 .map(|(active_key, inactive_meta)| (active_key.0.state, inactive_meta))
373 .collect()
374 .await;
375
376 (active_states, inactive_states)
377 }
378
379 pub fn start_executor(&self, context_gen: ContextGen) {
386 let Some((shutdown_receiver, sm_update_rx)) = self
387 .inner
388 .state
389 .write()
390 .expect("locking can't fail")
391 .start(context_gen.clone())
392 else {
393 panic!("start_executor was called previously");
394 };
395
396 let task_runner_inner = self.inner.clone();
397 let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
398 let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
399 let task_group_shutdown_rx = task_handle.make_shutdown_rx();
400 select! {
401 () = task_group_shutdown_rx => {
402 debug!(
403 target: LOG_CLIENT_REACTOR,
404 "Shutting down state machine executor runner due to task group shutdown signal"
405 );
406 },
407 shutdown_happened_sender = shutdown_receiver => {
408 match shutdown_happened_sender {
409 Ok(()) => {
410 debug!(
411 target: LOG_CLIENT_REACTOR,
412 "Shutting down state machine executor runner due to explicit shutdown signal"
413 );
414 },
415 Err(_) => {
416 warn!(
417 target: LOG_CLIENT_REACTOR,
418 "Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)"
419 );
420 }
421 }
422 },
423 () = executor_runner => {
424 error!(target: LOG_CLIENT_REACTOR, "State machine executor runner exited unexpectedly!");
425 },
426 };
427 });
428 }
429
430 pub fn stop_executor(&self) -> Option<()> {
444 self.inner.stop_executor()
445 }
446
447 pub fn notifier(&self) -> &Notifier {
450 &self.inner.notifier
451 }
452}
453
454impl Drop for ExecutorInner {
455 fn drop(&mut self) {
456 self.stop_executor();
457 }
458}
459
460struct TransitionForActiveState {
461 outcome: serde_json::Value,
462 state: DynState,
463 meta: ActiveStateMeta,
464 transition_fn: StateTransitionFunction<DynState>,
465}
466
467impl ExecutorInner {
468 async fn run(
469 &self,
470 global_context_gen: ContextGen,
471 sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
472 ) {
473 debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
474 if let Err(err) = self
475 .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
476 .await
477 {
478 warn!(
479 target: LOG_CLIENT_REACTOR,
480 err = %err.fmt_compact_anyhow(),
481 "An unexpected error occurred during a state transition"
482 );
483 }
484 }
485
486 async fn get_transition_for(
487 &self,
488 state: &DynState,
489 meta: ActiveStateMeta,
490 global_context_gen: &ContextGen,
491 ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
492 let module_instance = state.module_instance_id();
493 let context = &self
494 .module_contexts
495 .get(&module_instance)
496 .expect("Unknown module");
497 let transitions = state
498 .transitions(
499 context,
500 &global_context_gen(module_instance, state.operation_id()),
501 )
502 .into_iter()
503 .map(|transition| {
504 let state = state.clone();
505 let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
506 let StateTransition {
507 trigger,
508 transition,
509 } = transition;
510 TransitionForActiveState {
511 outcome: trigger.await,
512 state,
513 transition_fn: transition,
514 meta,
515 }
516 });
517 f
518 })
519 .collect::<Vec<_>>();
520 if transitions.is_empty() {
521 warn!(
525 target: LOG_CLIENT_REACTOR,
526 module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream."
527 );
528 self.db
529 .autocommit::<_, _, anyhow::Error>(
530 |dbtx, _| {
531 Box::pin(async {
532 let k = InactiveStateKey::from_state(state.clone());
533 let v = ActiveStateMeta::default().into_inactive();
534 dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
535 state.clone(),
536 )))
537 .await;
538 dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
539 Ok(())
540 })
541 },
542 None,
543 )
544 .await
545 .expect("Autocommit here can't fail");
546 }
547
548 transitions
549 }
550
551 async fn run_state_machines_executor_inner(
552 &self,
553 global_context_gen: ContextGen,
554 mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
555 ) -> anyhow::Result<()> {
556 enum ExecutorLoopEvent {
559 New { state: DynState },
562 Triggered(TransitionForActiveState),
565 Invalid { state: DynState },
567 Completed {
569 state: DynState,
570 outcome: ActiveOrInactiveState,
571 },
572 Disconnected,
574 }
575
576 let active_states = self.get_active_states().await;
577 trace!(target: LOG_CLIENT_REACTOR, "Starting active states: {:?}", active_states);
578 for (state, _meta) in active_states {
579 self.sm_update_tx
580 .send(state)
581 .expect("Must be able to send state machine to own opened channel");
582 }
583
584 let mut currently_running_sms = HashSet::<DynState>::new();
587 let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
595 FuturesUnordered::new();
596
597 loop {
598 let event = tokio::select! {
599 new = sm_update_rx.recv() => {
600 match new { Some(new) => {
601 ExecutorLoopEvent::New {
602 state: new,
603 }
604 } _ => {
605 ExecutorLoopEvent::Disconnected
606 }}
607 },
608
609 event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
610 };
611
612 match event {
615 ExecutorLoopEvent::New { state } => {
616 if currently_running_sms.contains(&state) {
617 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
618 continue;
619 }
620 currently_running_sms.insert(state.clone());
621 let futures_len = futures.len();
622 let global_context_gen = &global_context_gen;
623 trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
624 futures.push(Box::pin(async move {
625 let Some(meta) = self.get_active_state(&state).await else {
626 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
627 return ExecutorLoopEvent::Invalid { state: state.clone() };
628 };
629
630 let transitions = self
631 .get_transition_for(&state, meta, global_context_gen)
632 .await;
633 if transitions.is_empty() {
634 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
635 return ExecutorLoopEvent::Invalid { state: state.clone() };
636 }
637 let transitions_num = transitions.len();
638
639 debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
640
641 let (first_completed_result, _index, _unused_transitions) =
642 select_all(transitions).await;
643 ExecutorLoopEvent::Triggered(first_completed_result)
644 }));
645 }
646 ExecutorLoopEvent::Triggered(TransitionForActiveState {
647 outcome,
648 state,
649 meta,
650 transition_fn,
651 }) => {
652 debug!(
653 target: LOG_CLIENT_REACTOR,
654 operation_id = %state.operation_id().fmt_short(),
655 "Triggered state transition",
656 );
657 let span = tracing::debug_span!(
658 target: LOG_CLIENT_REACTOR,
659 "sm_transition",
660 operation_id = %state.operation_id().fmt_short()
661 );
662 futures.push({
668 let sm_update_tx = self.sm_update_tx.clone();
669 let db = self.db.clone();
670 let notifier = self.notifier.clone();
671 let module_contexts = self.module_contexts.clone();
672 let global_context_gen = global_context_gen.clone();
673 Box::pin(
674 async move {
675 debug!(
676 target: LOG_CLIENT_REACTOR,
677 "Executing state transition",
678 );
679 trace!(
680 target: LOG_CLIENT_REACTOR,
681 ?state,
682 outcome = ?AbbreviateJson(&outcome),
683 "Executing state transition (details)",
684 );
685
686 let module_contexts = &module_contexts;
687 let global_context_gen = &global_context_gen;
688
689 let outcome = db
690 .autocommit::<'_, '_, _, _, Infallible>(
691 |dbtx, _| {
692 let state = state.clone();
693 let state_module_instance_id = state.module_instance_id();
694 let transition_fn = transition_fn.clone();
695 let transition_outcome = outcome.clone();
696 Box::pin(async move {
697 let new_state = transition_fn(
698 &mut ClientSMDatabaseTransaction::new(
699 &mut dbtx.to_ref(),
700 state.module_instance_id(),
701 ),
702 transition_outcome.clone(),
703 state.clone(),
704 )
705 .await;
706 dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
707 state.clone(),
708 )))
709 .await;
710 dbtx.insert_entry(
711 &InactiveStateKeyDb(InactiveStateKey::from_state(state.clone())),
712 &meta.into_inactive(),
713 )
714 .await;
715
716 let context = &module_contexts
717 .get(&state.module_instance_id())
718 .expect("Unknown module");
719
720 let operation_id = state.operation_id();
721 let global_context = global_context_gen(
722 state.module_instance_id(),
723 operation_id,
724 );
725
726 let is_terminal = new_state.is_terminal(context, &global_context);
727
728 self.log_event_dbtx(dbtx,
729 StateMachineUpdated{
730 started: false,
731 operation_id,
732 module_id: state_module_instance_id,
733 terminal: is_terminal,
734 }
735 ).await;
736
737 if is_terminal {
738 let k = InactiveStateKey::from_state(
739 new_state.clone(),
740 );
741 let v = ActiveStateMeta::default().into_inactive();
742 dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
743 Ok(ActiveOrInactiveState::Inactive {
744 dyn_state: new_state,
745 })
746 } else {
747 let k = ActiveStateKey::from_state(
748 new_state.clone(),
749 );
750 let v = ActiveStateMeta::default();
751 dbtx.insert_entry(&ActiveStateKeyDb(k), &v).await;
752 Ok(ActiveOrInactiveState::Active {
753 dyn_state: new_state,
754 meta: v,
755 })
756 }
757 })
758 },
759 None,
760 )
761 .await
762 .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
763
764 debug!(
765 target: LOG_CLIENT_REACTOR,
766 terminal = !outcome.is_active(),
767 ?outcome,
768 "State transition complete",
769 );
770
771 match &outcome {
772 ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
773 sm_update_tx
774 .send(dyn_state.clone())
775 .expect("can't fail: we are the receiving end");
776 notifier.notify(dyn_state.clone());
777 }
778 ActiveOrInactiveState::Inactive { dyn_state } => {
779 notifier.notify(dyn_state.clone());
780 }
781 }
782 ExecutorLoopEvent::Completed { state, outcome }
783 }
784 .instrument(span),
785 )
786 });
787 }
788 ExecutorLoopEvent::Invalid { state } => {
789 trace!(
790 target: LOG_CLIENT_REACTOR,
791 operation_id = %state.operation_id().fmt_short(), total = futures.len(),
792 "State invalid"
793 );
794 assert!(
795 currently_running_sms.remove(&state),
796 "State must have been recorded"
797 );
798 }
799
800 ExecutorLoopEvent::Completed { state, outcome } => {
801 assert!(
802 currently_running_sms.remove(&state),
803 "State must have been recorded"
804 );
805 debug!(
806 target: LOG_CLIENT_REACTOR,
807 operation_id = %state.operation_id().fmt_short(),
808 outcome_active = outcome.is_active(),
809 total = futures.len(),
810 "State transition complete"
811 );
812 trace!(
813 target: LOG_CLIENT_REACTOR,
814 ?outcome,
815 operation_id = %state.operation_id().fmt_short(), total = futures.len(),
816 "State transition complete"
817 );
818 }
819 ExecutorLoopEvent::Disconnected => {
820 break;
821 }
822 }
823 }
824
825 info!(target: LOG_CLIENT_REACTOR, "Terminated.");
826 Ok(())
827 }
828
829 async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
830 self.db
831 .begin_transaction_nc()
832 .await
833 .find_by_prefix(&ActiveStateKeyPrefix)
834 .await
835 .filter(|(state, _)| {
837 future::ready(
838 self.module_contexts
839 .contains_key(&state.0.state.module_instance_id()),
840 )
841 })
842 .map(|(state, meta)| (state.0.state, meta))
843 .collect::<Vec<_>>()
844 .await
845 }
846
847 async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
848 if !self
850 .module_contexts
851 .contains_key(&state.module_instance_id())
852 {
853 return None;
854 }
855 self.db
856 .begin_transaction_nc()
857 .await
858 .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
859 .await
860 }
861
862 async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
863 self.db
864 .begin_transaction_nc()
865 .await
866 .find_by_prefix(&InactiveStateKeyPrefix)
867 .await
868 .filter(|(state, _)| {
870 future::ready(
871 self.module_contexts
872 .contains_key(&state.0.state.module_instance_id()),
873 )
874 })
875 .map(|(state, meta)| (state.0.state, meta))
876 .collect::<Vec<_>>()
877 .await
878 }
879
880 pub async fn log_event_dbtx<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
881 where
882 E: Event + Send,
883 Cap: Send,
884 {
885 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), None, event)
886 .await;
887 }
888}
889
890impl ExecutorInner {
891 fn stop_executor(&self) -> Option<()> {
893 let mut state = self.state.write().expect("Locking can't fail");
894
895 state.stop()
896 }
897}
898
899impl Debug for ExecutorInner {
900 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
901 writeln!(f, "ExecutorInner {{}}")
902 }
903}
904
905impl ExecutorBuilder {
906 pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
909 where
910 C: IntoDynInstance<DynType = DynContext>,
911 {
912 self.with_module_dyn(context.into_dyn(instance_id));
913 }
914
915 pub fn with_module_dyn(&mut self, context: DynContext) {
918 self.valid_module_ids.insert(context.module_instance_id());
919
920 if self
921 .module_contexts
922 .insert(context.module_instance_id(), context)
923 .is_some()
924 {
925 panic!("Tried to add two modules with the same instance id!");
926 }
927 }
928
929 pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
933 self.valid_module_ids.insert(module_id);
934 }
935
936 pub fn build(
940 self,
941 db: Database,
942 notifier: Notifier,
943 client_task_group: TaskGroup,
944 log_ordering_wakeup_tx: watch::Sender<()>,
945 ) -> Executor {
946 let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
947
948 let inner = Arc::new(ExecutorInner {
949 db,
950 log_ordering_wakeup_tx,
951 state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
952 module_contexts: self.module_contexts,
953 valid_module_ids: self.valid_module_ids,
954 notifier,
955 sm_update_tx,
956 client_task_group,
957 });
958
959 debug!(
960 target: LOG_CLIENT_REACTOR,
961 instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
962 "Initialized state machine executor with module instances"
963 );
964 Executor { inner }
965 }
966}
967#[derive(Debug)]
968pub struct ActiveOperationStateKeyPrefix {
969 pub operation_id: OperationId,
970}
971
972impl Encodable for ActiveOperationStateKeyPrefix {
973 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
974 self.operation_id.consensus_encode(writer)
975 }
976}
977
978impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
979 type Record = ActiveStateKeyDb;
980}
981
982#[derive(Debug)]
983pub(crate) struct ActiveModuleOperationStateKeyPrefix {
984 pub operation_id: OperationId,
985 pub module_instance: ModuleInstanceId,
986}
987
988impl Encodable for ActiveModuleOperationStateKeyPrefix {
989 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
990 self.operation_id.consensus_encode(writer)?;
991 self.module_instance.consensus_encode(writer)?;
992 Ok(())
993 }
994}
995
996impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
997 type Record = ActiveStateKeyDb;
998}
999
1000#[derive(Debug)]
1001pub struct ActiveStateKeyPrefix;
1002
1003impl Encodable for ActiveStateKeyPrefix {
1004 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1005 Ok(())
1006 }
1007}
1008
1009#[derive(Encodable, Decodable, Debug)]
1010pub struct ActiveStateKeyDb(pub fedimint_client_module::sm::executor::ActiveStateKey);
1011
1012impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyDb {
1013 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1014 const NOTIFY_ON_MODIFY: bool = true;
1015 type Key = Self;
1016 type Value = ActiveStateMeta;
1017}
1018
1019impl DatabaseKeyWithNotify for ActiveStateKeyDb {}
1020
1021impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
1022 type Record = ActiveStateKeyDb;
1023}
1024
1025#[derive(Debug, Encodable, Decodable)]
1026pub struct ActiveStateKeyPrefixBytes;
1027
1028impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
1029 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1030 const NOTIFY_ON_MODIFY: bool = false;
1031 type Key = Self;
1032 type Value = ActiveStateMeta;
1033}
1034
1035impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
1036 type Record = ActiveStateKeyBytes;
1037}
1038
1039#[derive(Encodable, Decodable, Debug)]
1040pub struct InactiveStateKeyDb(pub fedimint_client_module::sm::executor::InactiveStateKey);
1041
1042#[derive(Debug)]
1043pub struct InactiveStateKeyBytes {
1044 pub operation_id: OperationId,
1045 pub module_instance_id: ModuleInstanceId,
1046 pub state: Vec<u8>,
1047}
1048
1049impl Encodable for InactiveStateKeyBytes {
1050 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1051 self.operation_id.consensus_encode(writer)?;
1052 writer.write_all(self.state.as_slice())?;
1053 Ok(())
1054 }
1055}
1056
1057impl Decodable for InactiveStateKeyBytes {
1058 fn consensus_decode_partial<R: std::io::Read>(
1059 reader: &mut R,
1060 modules: &ModuleDecoderRegistry,
1061 ) -> Result<Self, DecodeError> {
1062 let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1063 let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1064 let mut bytes = Vec::new();
1065 reader
1066 .read_to_end(&mut bytes)
1067 .map_err(DecodeError::from_err)?;
1068
1069 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1070 instance_bytes.append(&mut bytes);
1071
1072 Ok(InactiveStateKeyBytes {
1073 operation_id,
1074 module_instance_id,
1075 state: instance_bytes,
1076 })
1077 }
1078}
1079
1080#[derive(Debug)]
1081pub struct InactiveOperationStateKeyPrefix {
1082 pub operation_id: OperationId,
1083}
1084
1085impl Encodable for InactiveOperationStateKeyPrefix {
1086 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1087 self.operation_id.consensus_encode(writer)
1088 }
1089}
1090
1091impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1092 type Record = InactiveStateKeyDb;
1093}
1094
1095#[derive(Debug)]
1096pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1097 pub operation_id: OperationId,
1098 pub module_instance: ModuleInstanceId,
1099}
1100
1101impl Encodable for InactiveModuleOperationStateKeyPrefix {
1102 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1103 self.operation_id.consensus_encode(writer)?;
1104 self.module_instance.consensus_encode(writer)?;
1105 Ok(())
1106 }
1107}
1108
1109impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1110 type Record = InactiveStateKeyDb;
1111}
1112
1113#[derive(Debug, Clone)]
1114pub struct InactiveStateKeyPrefix;
1115
1116impl Encodable for InactiveStateKeyPrefix {
1117 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1118 Ok(())
1119 }
1120}
1121
1122#[derive(Debug, Encodable, Decodable)]
1123pub struct InactiveStateKeyPrefixBytes;
1124
1125impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1126 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1127 const NOTIFY_ON_MODIFY: bool = false;
1128 type Key = Self;
1129 type Value = InactiveStateMeta;
1130}
1131
1132impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1133 type Record = InactiveStateKeyBytes;
1134}
1135
1136#[derive(Debug)]
1137pub struct ActiveStateKeyBytes {
1138 pub operation_id: OperationId,
1139 pub module_instance_id: ModuleInstanceId,
1140 pub state: Vec<u8>,
1141}
1142
1143impl Encodable for ActiveStateKeyBytes {
1144 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1145 self.operation_id.consensus_encode(writer)?;
1146 writer.write_all(self.state.as_slice())?;
1147 Ok(())
1148 }
1149}
1150
1151impl Decodable for ActiveStateKeyBytes {
1152 fn consensus_decode_partial<R: std::io::Read>(
1153 reader: &mut R,
1154 modules: &ModuleDecoderRegistry,
1155 ) -> Result<Self, DecodeError> {
1156 let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1157 let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1158 let mut bytes = Vec::new();
1159 reader
1160 .read_to_end(&mut bytes)
1161 .map_err(DecodeError::from_err)?;
1162
1163 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1164 instance_bytes.append(&mut bytes);
1165
1166 Ok(ActiveStateKeyBytes {
1167 operation_id,
1168 module_instance_id,
1169 state: instance_bytes,
1170 })
1171 }
1172}
1173impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyDb {
1174 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1175 const NOTIFY_ON_MODIFY: bool = true;
1176 type Key = Self;
1177 type Value = InactiveStateMeta;
1178}
1179
1180impl DatabaseKeyWithNotify for InactiveStateKeyDb {}
1181
1182impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1183 type Record = InactiveStateKeyDb;
1184}
1185
1186#[derive(Debug)]
1187enum ActiveOrInactiveState {
1188 Active {
1189 dyn_state: DynState,
1190 #[allow(dead_code)] meta: ActiveStateMeta,
1192 },
1193 Inactive {
1194 dyn_state: DynState,
1195 },
1196}
1197
1198impl ActiveOrInactiveState {
1199 fn is_active(&self) -> bool {
1200 match self {
1201 ActiveOrInactiveState::Active { .. } => true,
1202 ActiveOrInactiveState::Inactive { .. } => false,
1203 }
1204 }
1205}
1206
1207#[apply(async_trait_maybe_send!)]
1208impl IExecutor for Executor {
1209 async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
1210 Self::get_active_states(self).await
1211 }
1212
1213 async fn add_state_machines_dbtx(
1214 &self,
1215 dbtx: &mut DatabaseTransaction<'_>,
1216 states: Vec<DynState>,
1217 ) -> AddStateMachinesResult {
1218 Self::add_state_machines_dbtx(self, dbtx, states).await
1219 }
1220}
1221
1222#[cfg(test)]
1223mod tests;