fedimint_client_module/sm/
notifier.rs1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use fedimint_core::core::{ModuleInstanceId, OperationId};
5use fedimint_core::util::broadcaststream::BroadcastStream;
6use fedimint_core::util::{BoxStream, FmtCompact};
7use fedimint_logging::LOG_CLIENT;
8use futures::StreamExt as _;
9use tracing::{debug, error, trace};
10
11use super::{DynState, State};
12use crate::module::FinalClientIface;
13use crate::sm::executor::{ActiveStateKey, InactiveStateKey};
14use crate::sm::{ActiveStateMeta, InactiveStateMeta};
15
16#[derive(Debug, Clone)]
19pub struct ModuleNotifier<S> {
20 broadcast: tokio::sync::broadcast::Sender<DynState>,
21 module_instance: ModuleInstanceId,
22 client: FinalClientIface,
23 _pd: PhantomData<S>,
26}
27
28impl<S> ModuleNotifier<S>
29where
30 S: State,
31{
32 pub fn new(
33 broadcast: tokio::sync::broadcast::Sender<DynState>,
34 module_instance: ModuleInstanceId,
35 client: FinalClientIface,
36 ) -> Self {
37 Self {
38 broadcast,
39 module_instance,
40 client,
41 _pd: PhantomData,
42 }
43 }
44
45 pub async fn subscribe(&self, operation_id: OperationId) -> BoxStream<'static, S> {
55 let to_typed_state = |state: DynState| {
56 state
57 .as_any()
58 .downcast_ref::<S>()
59 .expect("Tried to subscribe to wrong state type")
60 .clone()
61 };
62
63 let new_transitions = self.subscribe_all_operations();
66
67 let client_strong = self.client.get();
68 let db_states = {
69 let mut dbtx = client_strong.db().begin_transaction_nc().await;
70 let active_states = client_strong
71 .read_operation_active_states(operation_id, self.module_instance, &mut dbtx)
72 .await
73 .map(|(key, val): (ActiveStateKey, ActiveStateMeta)| {
74 (to_typed_state(key.state), val.created_at)
75 })
76 .collect::<Vec<(S, _)>>()
77 .await;
78
79 let inactive_states = self
80 .client
81 .get()
82 .read_operation_inactive_states(operation_id, self.module_instance, &mut dbtx)
83 .await
84 .map(|(key, val): (InactiveStateKey, InactiveStateMeta)| {
85 (to_typed_state(key.state), val.created_at)
86 })
87 .collect::<Vec<(S, _)>>()
88 .await;
89
90 let num_active = active_states.len();
93 let num_inactive = inactive_states.len();
94 let mut all_states_timed = active_states
95 .into_iter()
96 .chain(inactive_states)
97 .collect::<Vec<(S, _)>>();
98 all_states_timed.sort_by_key(|(_, t1)| *t1);
99 debug!(
100 operation_id = %operation_id.fmt_short(),
101 module_instance = %self.module_instance,
102 active = num_active,
103 inactive = num_inactive,
104 "Returning state transitions from DB for notifier subscription",
105 );
106 all_states_timed
107 .into_iter()
108 .map(|(s, _)| s)
109 .collect::<Vec<S>>()
110 };
111
112 let new_transitions = new_transitions.filter_map({
113 let db_states: Arc<_> = Arc::new(db_states.clone());
114
115 move |state: S| {
116 let db_states = db_states.clone();
117 async move {
118 if state.operation_id() == operation_id {
119 trace!(operation_id = %operation_id.fmt_short(), ?state, "Received state transition notification");
120 if db_states.iter().any(|db_s| db_s == &state) {
128 debug!(operation_id = %operation_id.fmt_short(), ?state, "Ignoring duplicated event");
129 return None;
130 }
131 Some(state)
132 } else {
133 None
134 }
135 }
136 }
137 });
138 Box::pin(futures::stream::iter(db_states).chain(new_transitions))
139 }
140
141 pub fn subscribe_all_operations(&self) -> BoxStream<'static, S> {
143 let module_instance_id = self.module_instance;
144 Box::pin(
145 BroadcastStream::new(self.broadcast.subscribe())
146 .take_while(|res| {
147 let cont = if let Err(err) = res {
148 error!(target: LOG_CLIENT, err = %err.fmt_compact(), "ModuleNotifier stream stopped on error");
149 false
150 } else {
151 true
152 };
153 std::future::ready(cont)
154 })
155 .filter_map(move |res| async move {
156 let s = res.expect("We filtered out errors above");
157 if s.module_instance_id() == module_instance_id {
158 Some(
159 s.as_any()
160 .downcast_ref::<S>()
161 .expect("Tried to subscribe to wrong state type")
162 .clone(),
163 )
164 } else {
165 None
166 }
167 }),
168 )
169 }
170}