1use std::collections::{BTreeMap, BTreeSet, HashSet};
2use std::convert::Infallible;
3use std::fmt::{Debug, Formatter};
4use std::io::{Error, Write};
5use std::mem;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::anyhow;
10use fedimint_client_module::sm::executor::{
11 ActiveStateKey, ContextGen, IExecutor, InactiveStateKey,
12};
13use fedimint_client_module::sm::{
14 ActiveStateMeta, ClientSMDatabaseTransaction, DynContext, DynState, InactiveStateMeta, State,
15 StateTransition, StateTransitionFunction,
16};
17use fedimint_core::core::{IntoDynInstance, ModuleInstanceId, OperationId};
18use fedimint_core::db::{
19 AutocommitError, Database, DatabaseKeyWithNotify, DatabaseTransaction,
20 IDatabaseTransactionOpsCoreTyped,
21};
22use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
23use fedimint_core::fmt_utils::AbbreviateJson;
24use fedimint_core::module::registry::ModuleDecoderRegistry;
25use fedimint_core::task::TaskGroup;
26use fedimint_core::util::{BoxFuture, FmtCompactAnyhow as _};
27use fedimint_core::{apply, async_trait_maybe_send};
28use fedimint_eventlog::{DBTransactionEventLogExt as _, Event, EventKind, EventPersistence};
29use fedimint_logging::LOG_CLIENT_REACTOR;
30use futures::future::{self, select_all};
31use futures::stream::{FuturesUnordered, StreamExt};
32use serde::{Deserialize, Serialize};
33use tokio::select;
34use tokio::sync::{mpsc, oneshot, watch};
35use tracing::{Instrument, debug, error, info, trace, warn};
36
37use crate::sm::notifier::Notifier;
38use crate::{AddStateMachinesError, AddStateMachinesResult, DynGlobalClientContext};
39
40const MAX_DB_ATTEMPTS: Option<usize> = Some(100);
42
43pub(crate) enum ExecutorDbPrefixes {
45 ActiveStates = 0xa1,
47 InactiveStates = 0xa2,
49}
50
51#[derive(Serialize, Deserialize)]
52pub struct StateMachineUpdated {
53 operation_id: OperationId,
54 started: bool,
55 terminal: bool,
56 module_id: ModuleInstanceId,
57}
58
59impl Event for StateMachineUpdated {
60 const MODULE: Option<fedimint_core::core::ModuleKind> = None;
61 const KIND: EventKind = EventKind::from_static("sm-updated");
62 const PERSISTENCE: EventPersistence = EventPersistence::Trimable;
63}
64
65#[derive(Clone, Debug)]
73pub struct Executor {
74 inner: Arc<ExecutorInner>,
75}
76
77struct ExecutorInner {
78 db: Database,
79 state: std::sync::RwLock<ExecutorState>,
80 module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
81 valid_module_ids: BTreeSet<ModuleInstanceId>,
82 notifier: Notifier,
83 sm_update_tx: mpsc::UnboundedSender<DynState>,
86 client_task_group: TaskGroup,
87 log_ordering_wakeup_tx: watch::Sender<()>,
88}
89
90enum ExecutorState {
91 Unstarted {
92 sm_update_rx: mpsc::UnboundedReceiver<DynState>,
93 },
94 Running {
95 context_gen: ContextGen,
96 shutdown_sender: oneshot::Sender<()>,
97 },
98 Stopped,
99}
100
101impl ExecutorState {
102 fn start(
106 &mut self,
107 context: ContextGen,
108 ) -> Option<(oneshot::Receiver<()>, mpsc::UnboundedReceiver<DynState>)> {
109 let (shutdown_sender, shutdown_receiver) = tokio::sync::oneshot::channel::<()>();
110
111 let previous_state = mem::replace(
112 self,
113 ExecutorState::Running {
114 context_gen: context,
115 shutdown_sender,
116 },
117 );
118
119 match previous_state {
120 ExecutorState::Unstarted { sm_update_rx } => Some((shutdown_receiver, sm_update_rx)),
121 _ => {
122 *self = previous_state;
124
125 debug!(target: LOG_CLIENT_REACTOR, "Executor already started, ignoring start request");
126 None
127 }
128 }
129 }
130
131 fn stop(&mut self) -> Option<()> {
134 let previous_state = mem::replace(self, ExecutorState::Stopped);
135
136 match previous_state {
137 ExecutorState::Running {
138 shutdown_sender, ..
139 } => {
140 if shutdown_sender.send(()).is_err() {
141 warn!(target: LOG_CLIENT_REACTOR, "Failed to send shutdown signal to executor, already dead?");
142 }
143 Some(())
144 }
145 _ => {
146 *self = previous_state;
148
149 debug!(target: LOG_CLIENT_REACTOR, "Executor not running, ignoring stop request");
150 None
151 }
152 }
153 }
154
155 fn gen_context(&self, state: &DynState) -> Option<DynGlobalClientContext> {
156 let ExecutorState::Running { context_gen, .. } = self else {
157 return None;
158 };
159 Some(context_gen(
160 state.module_instance_id(),
161 state.operation_id(),
162 ))
163 }
164}
165
166#[derive(Debug, Default)]
169pub struct ExecutorBuilder {
170 module_contexts: BTreeMap<ModuleInstanceId, DynContext>,
171 valid_module_ids: BTreeSet<ModuleInstanceId>,
172}
173
174impl Executor {
175 pub fn builder() -> ExecutorBuilder {
177 ExecutorBuilder::default()
178 }
179
180 pub async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
181 self.inner.get_active_states().await
182 }
183
184 pub async fn add_state_machines(&self, states: Vec<DynState>) -> anyhow::Result<()> {
190 self.inner
191 .db
192 .autocommit(
193 |dbtx, _| Box::pin(self.add_state_machines_dbtx(dbtx, states.clone())),
194 MAX_DB_ATTEMPTS,
195 )
196 .await
197 .map_err(|e| match e {
198 AutocommitError::CommitFailed {
199 last_error,
200 attempts,
201 } => anyhow!("Failed to commit after {attempts} attempts: {last_error}"),
202 AutocommitError::ClosureError { error, .. } => anyhow!("{error:?}"),
203 })?;
204
205 Ok(())
208 }
209
210 pub async fn add_state_machines_dbtx(
219 &self,
220 dbtx: &mut DatabaseTransaction<'_>,
221 states: Vec<DynState>,
222 ) -> AddStateMachinesResult {
223 for state in states {
224 if !self
225 .inner
226 .valid_module_ids
227 .contains(&state.module_instance_id())
228 {
229 return Err(AddStateMachinesError::Other(anyhow!("Unknown module")));
230 }
231
232 let is_active_state = dbtx
233 .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
234 .await
235 .is_some();
236 let is_inactive_state = dbtx
237 .get_value(&InactiveStateKeyDb(InactiveStateKey::from_state(
238 state.clone(),
239 )))
240 .await
241 .is_some();
242
243 if is_active_state || is_inactive_state {
244 return Err(AddStateMachinesError::StateAlreadyExists);
245 }
246
247 if let Some(module_context) =
252 self.inner.module_contexts.get(&state.module_instance_id())
253 {
254 match self
255 .inner
256 .state
257 .read()
258 .expect("locking failed")
259 .gen_context(&state)
260 {
261 Some(context) => {
262 if state.is_terminal(module_context, &context) {
263 return Err(AddStateMachinesError::Other(anyhow!(
264 "State is already terminal, adding it to the executor doesn't make sense."
265 )));
266 }
267 }
268 _ => {
269 warn!(target: LOG_CLIENT_REACTOR, "Executor should be running at this point");
270 }
271 }
272 }
273
274 dbtx.insert_new_entry(
275 &ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())),
276 &ActiveStateMeta::default(),
277 )
278 .await;
279
280 let operation_id = state.operation_id();
281 self.inner
282 .log_event_dbtx(
283 dbtx,
284 StateMachineUpdated {
285 operation_id,
286 started: true,
287 terminal: false,
288 module_id: state.module_instance_id(),
289 },
290 )
291 .await;
292
293 let notify_sender = self.inner.notifier.sender();
294 let sm_updates_tx = self.inner.sm_update_tx.clone();
295 dbtx.on_commit(move || {
296 notify_sender.notify(state.clone());
297 let _ = sm_updates_tx.send(state);
298 });
299 }
300
301 Ok(())
302 }
303
304 pub async fn contains_active_state<S: State>(
309 &self,
310 instance: ModuleInstanceId,
311 state: S,
312 ) -> bool {
313 let state = DynState::from_typed(instance, state);
314 self.inner
315 .get_active_states()
316 .await
317 .into_iter()
318 .any(|(s, _)| s == state)
319 }
320
321 pub async fn contains_inactive_state<S: State>(
329 &self,
330 instance: ModuleInstanceId,
331 state: S,
332 ) -> bool {
333 let state = DynState::from_typed(instance, state);
334 self.inner
335 .get_inactive_states()
336 .await
337 .into_iter()
338 .any(|(s, _)| s == state)
339 }
340
341 pub async fn await_inactive_state(&self, state: DynState) -> InactiveStateMeta {
342 self.inner
343 .db
344 .wait_key_exists(&InactiveStateKeyDb(InactiveStateKey::from_state(state)))
345 .await
346 }
347
348 pub async fn await_active_state(&self, state: DynState) -> ActiveStateMeta {
349 self.inner
350 .db
351 .wait_key_exists(&ActiveStateKeyDb(ActiveStateKey::from_state(state)))
352 .await
353 }
354
355 pub async fn get_operation_states(
357 &self,
358 operation_id: OperationId,
359 ) -> (
360 Vec<(DynState, ActiveStateMeta)>,
361 Vec<(DynState, InactiveStateMeta)>,
362 ) {
363 let mut dbtx = self.inner.db.begin_transaction_nc().await;
364 let active_states: Vec<_> = dbtx
365 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
366 .await
367 .map(|(active_key, active_meta)| (active_key.0.state, active_meta))
368 .collect()
369 .await;
370 let inactive_states: Vec<_> = dbtx
371 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
372 .await
373 .map(|(active_key, inactive_meta)| (active_key.0.state, inactive_meta))
374 .collect()
375 .await;
376
377 (active_states, inactive_states)
378 }
379
380 pub fn start_executor(&self, context_gen: ContextGen) {
387 let Some((shutdown_receiver, sm_update_rx)) = self
388 .inner
389 .state
390 .write()
391 .expect("locking can't fail")
392 .start(context_gen.clone())
393 else {
394 panic!("start_executor was called previously");
395 };
396
397 let task_runner_inner = self.inner.clone();
398 let _handle = self.inner.client_task_group.spawn("sm-executor", |task_handle| async move {
399 let executor_runner = task_runner_inner.run(context_gen, sm_update_rx);
400 let task_group_shutdown_rx = task_handle.make_shutdown_rx();
401 select! {
402 () = task_group_shutdown_rx => {
403 debug!(
404 target: LOG_CLIENT_REACTOR,
405 "Shutting down state machine executor runner due to task group shutdown signal"
406 );
407 },
408 shutdown_happened_sender = shutdown_receiver => {
409 match shutdown_happened_sender {
410 Ok(()) => {
411 debug!(
412 target: LOG_CLIENT_REACTOR,
413 "Shutting down state machine executor runner due to explicit shutdown signal"
414 );
415 },
416 Err(_) => {
417 warn!(
418 target: LOG_CLIENT_REACTOR,
419 "Shutting down state machine executor runner because the shutdown signal channel was closed (the executor object was dropped)"
420 );
421 }
422 }
423 },
424 () = executor_runner => {
425 error!(target: LOG_CLIENT_REACTOR, "State machine executor runner exited unexpectedly!");
426 },
427 };
428 });
429 }
430
431 pub fn stop_executor(&self) -> Option<()> {
445 self.inner.stop_executor()
446 }
447
448 pub fn notifier(&self) -> &Notifier {
451 &self.inner.notifier
452 }
453}
454
455impl Drop for ExecutorInner {
456 fn drop(&mut self) {
457 self.stop_executor();
458 }
459}
460
461struct TransitionForActiveState {
462 outcome: serde_json::Value,
463 state: DynState,
464 meta: ActiveStateMeta,
465 transition_fn: StateTransitionFunction<DynState>,
466}
467
468impl ExecutorInner {
469 async fn run(
470 &self,
471 global_context_gen: ContextGen,
472 sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
473 ) {
474 debug!(target: LOG_CLIENT_REACTOR, "Starting state machine executor task");
475 if let Err(err) = self
476 .run_state_machines_executor_inner(global_context_gen, sm_update_rx)
477 .await
478 {
479 warn!(
480 target: LOG_CLIENT_REACTOR,
481 err = %err.fmt_compact_anyhow(),
482 "An unexpected error occurred during a state transition"
483 );
484 }
485 }
486
487 async fn get_transition_for(
488 &self,
489 state: &DynState,
490 meta: ActiveStateMeta,
491 global_context_gen: &ContextGen,
492 ) -> Vec<BoxFuture<'static, TransitionForActiveState>> {
493 let module_instance = state.module_instance_id();
494 let context = &self
495 .module_contexts
496 .get(&module_instance)
497 .expect("Unknown module");
498 let transitions = state
499 .transitions(
500 context,
501 &global_context_gen(module_instance, state.operation_id()),
502 )
503 .into_iter()
504 .map(|transition| {
505 let state = state.clone();
506 let f: BoxFuture<TransitionForActiveState> = Box::pin(async move {
507 let StateTransition {
508 trigger,
509 transition,
510 } = transition;
511 TransitionForActiveState {
512 outcome: trigger.await,
513 state,
514 transition_fn: transition,
515 meta,
516 }
517 });
518 f
519 })
520 .collect::<Vec<_>>();
521 if transitions.is_empty() {
522 warn!(
526 target: LOG_CLIENT_REACTOR,
527 module_id = module_instance, "A terminal state where only active states are expected. Please report this bug upstream."
528 );
529 self.db
530 .autocommit::<_, _, anyhow::Error>(
531 |dbtx, _| {
532 Box::pin(async {
533 let k = InactiveStateKey::from_state(state.clone());
534 let v = ActiveStateMeta::default().into_inactive();
535 dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
536 state.clone(),
537 )))
538 .await;
539 dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
540 Ok(())
541 })
542 },
543 None,
544 )
545 .await
546 .expect("Autocommit here can't fail");
547 }
548
549 transitions
550 }
551
552 async fn run_state_machines_executor_inner(
553 &self,
554 global_context_gen: ContextGen,
555 mut sm_update_rx: tokio::sync::mpsc::UnboundedReceiver<DynState>,
556 ) -> anyhow::Result<()> {
557 enum ExecutorLoopEvent {
560 New { state: DynState },
563 Triggered(TransitionForActiveState),
566 Invalid { state: DynState },
568 Completed {
570 state: DynState,
571 outcome: ActiveOrInactiveState,
572 },
573 Disconnected,
575 }
576
577 let active_states = self.get_active_states().await;
578 debug!(
579 target: LOG_CLIENT_REACTOR,
580 total = active_states.len(),
581 "Starting active state machines",
582 );
583 for (state, meta) in active_states {
584 trace!(target: LOG_CLIENT_REACTOR, ?state, ?meta, "Starting active state");
585
586 let age = fedimint_core::time::now()
587 .duration_since(meta.created_at)
588 .unwrap_or_default();
589 if age > Duration::from_secs(7 * 24 * 3600) {
590 warn!(
591 target: LOG_CLIENT_REACTOR,
592 operation_id = %state.operation_id().fmt_short(),
593 module_instance = %state.module_instance_id(),
594 age_days = age.as_secs() / 86400,
595 "Active state machine has been running for over a week, possibly stuck",
596 );
597 }
598 self.sm_update_tx
599 .send(state)
600 .expect("Must be able to send state machine to own opened channel");
601 }
602
603 let mut currently_running_sms = HashSet::<DynState>::new();
606 let mut futures: FuturesUnordered<BoxFuture<'_, ExecutorLoopEvent>> =
614 FuturesUnordered::new();
615
616 loop {
617 let event = tokio::select! {
618 new = sm_update_rx.recv() => {
619 match new { Some(new) => {
620 ExecutorLoopEvent::New {
621 state: new,
622 }
623 } _ => {
624 ExecutorLoopEvent::Disconnected
625 }}
626 },
627
628 event = futures.next(), if !futures.is_empty() => event.expect("we only .next() if there are pending futures"),
629 };
630
631 match event {
634 ExecutorLoopEvent::New { state } => {
635 if currently_running_sms.contains(&state) {
636 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received a state machine that is already running. Ignoring");
637 continue;
638 }
639 currently_running_sms.insert(state.clone());
640 let futures_len = futures.len();
641 let global_context_gen = &global_context_gen;
642 trace!(target: LOG_CLIENT_REACTOR, state = ?state, "Started new active state machine, details.");
643 futures.push(Box::pin(async move {
644 let Some(meta) = self.get_active_state(&state).await else {
645 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Couldn't look up received state machine. Ignoring.");
646 return ExecutorLoopEvent::Invalid { state: state.clone() };
647 };
648
649 let transitions = self
650 .get_transition_for(&state, meta, global_context_gen)
651 .await;
652 if transitions.is_empty() {
653 warn!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), "Received an active state that doesn't produce any transitions. Ignoring.");
654 return ExecutorLoopEvent::Invalid { state: state.clone() };
655 }
656 let transitions_num = transitions.len();
657
658 debug!(target: LOG_CLIENT_REACTOR, operation_id = %state.operation_id().fmt_short(), total = futures_len + 1, transitions_num, "New active state machine.");
659
660 let (first_completed_result, _index, _unused_transitions) =
661 select_all(transitions).await;
662 ExecutorLoopEvent::Triggered(first_completed_result)
663 }));
664 }
665 ExecutorLoopEvent::Triggered(TransitionForActiveState {
666 outcome,
667 state,
668 meta,
669 transition_fn,
670 }) => {
671 debug!(
672 target: LOG_CLIENT_REACTOR,
673 operation_id = %state.operation_id().fmt_short(),
674 "Triggered state transition",
675 );
676 let span = tracing::debug_span!(
677 target: LOG_CLIENT_REACTOR,
678 "sm_transition",
679 operation_id = %state.operation_id().fmt_short()
680 );
681 futures.push({
687 let sm_update_tx = self.sm_update_tx.clone();
688 let db = self.db.clone();
689 let notifier = self.notifier.clone();
690 let module_contexts = self.module_contexts.clone();
691 let global_context_gen = global_context_gen.clone();
692 Box::pin(
693 async move {
694 debug!(
695 target: LOG_CLIENT_REACTOR,
696 "Executing state transition",
697 );
698 trace!(
699 target: LOG_CLIENT_REACTOR,
700 ?state,
701 outcome = ?AbbreviateJson(&outcome),
702 "Executing state transition (details)",
703 );
704
705 let module_contexts = &module_contexts;
706 let global_context_gen = &global_context_gen;
707
708 let outcome = db
709 .autocommit::<'_, '_, _, _, Infallible>(
710 |dbtx, _| {
711 let state = state.clone();
712 let state_module_instance_id = state.module_instance_id();
713 let transition_fn = transition_fn.clone();
714 let transition_outcome = outcome.clone();
715 Box::pin(async move {
716 let new_state = transition_fn(
717 &mut ClientSMDatabaseTransaction::new(
718 &mut dbtx.to_ref(),
719 state.module_instance_id(),
720 ),
721 transition_outcome.clone(),
722 state.clone(),
723 )
724 .await;
725 dbtx.remove_entry(&ActiveStateKeyDb(ActiveStateKey::from_state(
726 state.clone(),
727 )))
728 .await;
729 dbtx.insert_entry(
730 &InactiveStateKeyDb(InactiveStateKey::from_state(state.clone())),
731 &meta.into_inactive(),
732 )
733 .await;
734
735 let context = &module_contexts
736 .get(&state.module_instance_id())
737 .expect("Unknown module");
738
739 let operation_id = state.operation_id();
740 let global_context = global_context_gen(
741 state.module_instance_id(),
742 operation_id,
743 );
744
745 let is_terminal = new_state.is_terminal(context, &global_context);
746
747 self.log_event_dbtx(dbtx,
748 StateMachineUpdated{
749 started: false,
750 operation_id,
751 module_id: state_module_instance_id,
752 terminal: is_terminal,
753 }
754 ).await;
755
756 if is_terminal {
757 let k = InactiveStateKey::from_state(
758 new_state.clone(),
759 );
760 let v = ActiveStateMeta::default().into_inactive();
761 dbtx.insert_entry(&InactiveStateKeyDb(k), &v).await;
762 Ok(ActiveOrInactiveState::Inactive {
763 dyn_state: new_state,
764 })
765 } else {
766 let k = ActiveStateKey::from_state(
767 new_state.clone(),
768 );
769 let v = ActiveStateMeta::default();
770 dbtx.insert_entry(&ActiveStateKeyDb(k), &v).await;
771 Ok(ActiveOrInactiveState::Active {
772 dyn_state: new_state,
773 meta: v,
774 })
775 }
776 })
777 },
778 None,
779 )
780 .await
781 .expect("autocommit should keep trying to commit (max_attempt: None) and body doesn't return errors");
782
783 debug!(
784 target: LOG_CLIENT_REACTOR,
785 terminal = !outcome.is_active(),
786 ?outcome,
787 "State transition complete",
788 );
789
790 match &outcome {
791 ActiveOrInactiveState::Active { dyn_state, meta: _ } => {
792 sm_update_tx
793 .send(dyn_state.clone())
794 .expect("can't fail: we are the receiving end");
795 notifier.notify(dyn_state.clone());
796 }
797 ActiveOrInactiveState::Inactive { dyn_state } => {
798 notifier.notify(dyn_state.clone());
799 }
800 }
801 ExecutorLoopEvent::Completed { state, outcome }
802 }
803 .instrument(span),
804 )
805 });
806 }
807 ExecutorLoopEvent::Invalid { state } => {
808 trace!(
809 target: LOG_CLIENT_REACTOR,
810 operation_id = %state.operation_id().fmt_short(), total = futures.len(),
811 "State invalid"
812 );
813 assert!(
814 currently_running_sms.remove(&state),
815 "State must have been recorded"
816 );
817 }
818
819 ExecutorLoopEvent::Completed { state, outcome } => {
820 assert!(
821 currently_running_sms.remove(&state),
822 "State must have been recorded"
823 );
824 debug!(
825 target: LOG_CLIENT_REACTOR,
826 operation_id = %state.operation_id().fmt_short(),
827 outcome_active = outcome.is_active(),
828 total = futures.len(),
829 "State transition complete"
830 );
831 trace!(
832 target: LOG_CLIENT_REACTOR,
833 ?outcome,
834 operation_id = %state.operation_id().fmt_short(), total = futures.len(),
835 "State transition complete"
836 );
837 }
838 ExecutorLoopEvent::Disconnected => {
839 break;
840 }
841 }
842 }
843
844 info!(target: LOG_CLIENT_REACTOR, "Terminated.");
845 Ok(())
846 }
847
848 async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
849 self.db
850 .begin_transaction_nc()
851 .await
852 .find_by_prefix(&ActiveStateKeyPrefix)
853 .await
854 .filter(|(state, _)| {
856 future::ready(
857 self.module_contexts
858 .contains_key(&state.0.state.module_instance_id()),
859 )
860 })
861 .map(|(state, meta)| (state.0.state, meta))
862 .collect::<Vec<_>>()
863 .await
864 }
865
866 async fn get_active_state(&self, state: &DynState) -> Option<ActiveStateMeta> {
867 if !self
869 .module_contexts
870 .contains_key(&state.module_instance_id())
871 {
872 return None;
873 }
874 self.db
875 .begin_transaction_nc()
876 .await
877 .get_value(&ActiveStateKeyDb(ActiveStateKey::from_state(state.clone())))
878 .await
879 }
880
881 async fn get_inactive_states(&self) -> Vec<(DynState, InactiveStateMeta)> {
882 self.db
883 .begin_transaction_nc()
884 .await
885 .find_by_prefix(&InactiveStateKeyPrefix)
886 .await
887 .filter(|(state, _)| {
889 future::ready(
890 self.module_contexts
891 .contains_key(&state.0.state.module_instance_id()),
892 )
893 })
894 .map(|(state, meta)| (state.0.state, meta))
895 .collect::<Vec<_>>()
896 .await
897 }
898
899 pub async fn log_event_dbtx<E, Cap>(&self, dbtx: &mut DatabaseTransaction<'_, Cap>, event: E)
900 where
901 E: Event + Send,
902 Cap: Send,
903 {
904 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), None, event)
905 .await;
906 }
907}
908
909impl ExecutorInner {
910 fn stop_executor(&self) -> Option<()> {
912 let mut state = self.state.write().expect("Locking can't fail");
913
914 state.stop()
915 }
916}
917
918impl Debug for ExecutorInner {
919 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
920 writeln!(f, "ExecutorInner {{}}")
921 }
922}
923
924impl ExecutorBuilder {
925 pub fn with_module<C>(&mut self, instance_id: ModuleInstanceId, context: C)
928 where
929 C: IntoDynInstance<DynType = DynContext>,
930 {
931 self.with_module_dyn(context.into_dyn(instance_id));
932 }
933
934 pub fn with_module_dyn(&mut self, context: DynContext) {
937 self.valid_module_ids.insert(context.module_instance_id());
938
939 if self
940 .module_contexts
941 .insert(context.module_instance_id(), context)
942 .is_some()
943 {
944 panic!("Tried to add two modules with the same instance id!");
945 }
946 }
947
948 pub fn with_valid_module_id(&mut self, module_id: ModuleInstanceId) {
952 self.valid_module_ids.insert(module_id);
953 }
954
955 pub fn build(
959 self,
960 db: Database,
961 notifier: Notifier,
962 client_task_group: TaskGroup,
963 log_ordering_wakeup_tx: watch::Sender<()>,
964 ) -> Executor {
965 let (sm_update_tx, sm_update_rx) = tokio::sync::mpsc::unbounded_channel();
966
967 let inner = Arc::new(ExecutorInner {
968 db,
969 log_ordering_wakeup_tx,
970 state: std::sync::RwLock::new(ExecutorState::Unstarted { sm_update_rx }),
971 module_contexts: self.module_contexts,
972 valid_module_ids: self.valid_module_ids,
973 notifier,
974 sm_update_tx,
975 client_task_group,
976 });
977
978 debug!(
979 target: LOG_CLIENT_REACTOR,
980 instances = ?inner.module_contexts.keys().copied().collect::<Vec<_>>(),
981 "Initialized state machine executor with module instances"
982 );
983 Executor { inner }
984 }
985}
986#[derive(Debug)]
987pub struct ActiveOperationStateKeyPrefix {
988 pub operation_id: OperationId,
989}
990
991impl Encodable for ActiveOperationStateKeyPrefix {
992 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
993 self.operation_id.consensus_encode(writer)
994 }
995}
996
997impl ::fedimint_core::db::DatabaseLookup for ActiveOperationStateKeyPrefix {
998 type Record = ActiveStateKeyDb;
999}
1000
1001#[derive(Debug)]
1002pub(crate) struct ActiveModuleOperationStateKeyPrefix {
1003 pub operation_id: OperationId,
1004 pub module_instance: ModuleInstanceId,
1005}
1006
1007impl Encodable for ActiveModuleOperationStateKeyPrefix {
1008 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1009 self.operation_id.consensus_encode(writer)?;
1010 self.module_instance.consensus_encode(writer)?;
1011 Ok(())
1012 }
1013}
1014
1015impl ::fedimint_core::db::DatabaseLookup for ActiveModuleOperationStateKeyPrefix {
1016 type Record = ActiveStateKeyDb;
1017}
1018
1019#[derive(Debug)]
1020pub struct ActiveStateKeyPrefix;
1021
1022impl Encodable for ActiveStateKeyPrefix {
1023 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1024 Ok(())
1025 }
1026}
1027
1028#[derive(Encodable, Decodable, Debug)]
1029pub struct ActiveStateKeyDb(pub fedimint_client_module::sm::executor::ActiveStateKey);
1030
1031impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyDb {
1032 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1033 const NOTIFY_ON_MODIFY: bool = true;
1034 type Key = Self;
1035 type Value = ActiveStateMeta;
1036}
1037
1038impl DatabaseKeyWithNotify for ActiveStateKeyDb {}
1039
1040impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefix {
1041 type Record = ActiveStateKeyDb;
1042}
1043
1044#[derive(Debug, Encodable, Decodable)]
1045pub struct ActiveStateKeyPrefixBytes;
1046
1047impl ::fedimint_core::db::DatabaseRecord for ActiveStateKeyBytes {
1048 const DB_PREFIX: u8 = ExecutorDbPrefixes::ActiveStates as u8;
1049 const NOTIFY_ON_MODIFY: bool = false;
1050 type Key = Self;
1051 type Value = ActiveStateMeta;
1052}
1053
1054impl ::fedimint_core::db::DatabaseLookup for ActiveStateKeyPrefixBytes {
1055 type Record = ActiveStateKeyBytes;
1056}
1057
1058#[derive(Encodable, Decodable, Debug)]
1059pub struct InactiveStateKeyDb(pub fedimint_client_module::sm::executor::InactiveStateKey);
1060
1061#[derive(Debug)]
1062pub struct InactiveStateKeyBytes {
1063 pub operation_id: OperationId,
1064 pub module_instance_id: ModuleInstanceId,
1065 pub state: Vec<u8>,
1066}
1067
1068impl Encodable for InactiveStateKeyBytes {
1069 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1070 self.operation_id.consensus_encode(writer)?;
1071 writer.write_all(self.state.as_slice())?;
1072 Ok(())
1073 }
1074}
1075
1076impl Decodable for InactiveStateKeyBytes {
1077 fn consensus_decode_partial<R: std::io::Read>(
1078 reader: &mut R,
1079 modules: &ModuleDecoderRegistry,
1080 ) -> Result<Self, DecodeError> {
1081 let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1082 let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1083 let mut bytes = Vec::new();
1084 reader
1085 .read_to_end(&mut bytes)
1086 .map_err(DecodeError::from_err)?;
1087
1088 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1089 instance_bytes.append(&mut bytes);
1090
1091 Ok(InactiveStateKeyBytes {
1092 operation_id,
1093 module_instance_id,
1094 state: instance_bytes,
1095 })
1096 }
1097}
1098
1099#[derive(Debug)]
1100pub struct InactiveOperationStateKeyPrefix {
1101 pub operation_id: OperationId,
1102}
1103
1104impl Encodable for InactiveOperationStateKeyPrefix {
1105 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1106 self.operation_id.consensus_encode(writer)
1107 }
1108}
1109
1110impl ::fedimint_core::db::DatabaseLookup for InactiveOperationStateKeyPrefix {
1111 type Record = InactiveStateKeyDb;
1112}
1113
1114#[derive(Debug)]
1115pub(crate) struct InactiveModuleOperationStateKeyPrefix {
1116 pub operation_id: OperationId,
1117 pub module_instance: ModuleInstanceId,
1118}
1119
1120impl Encodable for InactiveModuleOperationStateKeyPrefix {
1121 fn consensus_encode<W: Write>(&self, writer: &mut W) -> Result<(), Error> {
1122 self.operation_id.consensus_encode(writer)?;
1123 self.module_instance.consensus_encode(writer)?;
1124 Ok(())
1125 }
1126}
1127
1128impl ::fedimint_core::db::DatabaseLookup for InactiveModuleOperationStateKeyPrefix {
1129 type Record = InactiveStateKeyDb;
1130}
1131
1132#[derive(Debug, Clone)]
1133pub struct InactiveStateKeyPrefix;
1134
1135impl Encodable for InactiveStateKeyPrefix {
1136 fn consensus_encode<W: Write>(&self, _writer: &mut W) -> Result<(), Error> {
1137 Ok(())
1138 }
1139}
1140
1141#[derive(Debug, Encodable, Decodable)]
1142pub struct InactiveStateKeyPrefixBytes;
1143
1144impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyBytes {
1145 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1146 const NOTIFY_ON_MODIFY: bool = false;
1147 type Key = Self;
1148 type Value = InactiveStateMeta;
1149}
1150
1151impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefixBytes {
1152 type Record = InactiveStateKeyBytes;
1153}
1154
1155#[derive(Debug)]
1156pub struct ActiveStateKeyBytes {
1157 pub operation_id: OperationId,
1158 pub module_instance_id: ModuleInstanceId,
1159 pub state: Vec<u8>,
1160}
1161
1162impl Encodable for ActiveStateKeyBytes {
1163 fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
1164 self.operation_id.consensus_encode(writer)?;
1165 writer.write_all(self.state.as_slice())?;
1166 Ok(())
1167 }
1168}
1169
1170impl Decodable for ActiveStateKeyBytes {
1171 fn consensus_decode_partial<R: std::io::Read>(
1172 reader: &mut R,
1173 modules: &ModuleDecoderRegistry,
1174 ) -> Result<Self, DecodeError> {
1175 let operation_id = OperationId::consensus_decode_partial(reader, modules)?;
1176 let module_instance_id = ModuleInstanceId::consensus_decode_partial(reader, modules)?;
1177 let mut bytes = Vec::new();
1178 reader
1179 .read_to_end(&mut bytes)
1180 .map_err(DecodeError::from_err)?;
1181
1182 let mut instance_bytes = ModuleInstanceId::consensus_encode_to_vec(&module_instance_id);
1183 instance_bytes.append(&mut bytes);
1184
1185 Ok(ActiveStateKeyBytes {
1186 operation_id,
1187 module_instance_id,
1188 state: instance_bytes,
1189 })
1190 }
1191}
1192impl ::fedimint_core::db::DatabaseRecord for InactiveStateKeyDb {
1193 const DB_PREFIX: u8 = ExecutorDbPrefixes::InactiveStates as u8;
1194 const NOTIFY_ON_MODIFY: bool = true;
1195 type Key = Self;
1196 type Value = InactiveStateMeta;
1197}
1198
1199impl DatabaseKeyWithNotify for InactiveStateKeyDb {}
1200
1201impl ::fedimint_core::db::DatabaseLookup for InactiveStateKeyPrefix {
1202 type Record = InactiveStateKeyDb;
1203}
1204
1205#[derive(Debug)]
1206enum ActiveOrInactiveState {
1207 Active {
1208 dyn_state: DynState,
1209 #[allow(dead_code)] meta: ActiveStateMeta,
1211 },
1212 Inactive {
1213 dyn_state: DynState,
1214 },
1215}
1216
1217impl ActiveOrInactiveState {
1218 fn is_active(&self) -> bool {
1219 match self {
1220 ActiveOrInactiveState::Active { .. } => true,
1221 ActiveOrInactiveState::Inactive { .. } => false,
1222 }
1223 }
1224}
1225
1226#[apply(async_trait_maybe_send!)]
1227impl IExecutor for Executor {
1228 async fn get_active_states(&self) -> Vec<(DynState, ActiveStateMeta)> {
1229 Self::get_active_states(self).await
1230 }
1231
1232 async fn add_state_machines_dbtx(
1233 &self,
1234 dbtx: &mut DatabaseTransaction<'_>,
1235 states: Vec<DynState>,
1236 ) -> AddStateMachinesResult {
1237 Self::add_state_machines_dbtx(self, dbtx, states).await
1238 }
1239}
1240
1241#[cfg(test)]
1242mod tests;