Skip to main content

fedimint_client_module/sm/
notifier.rs

1use std::marker::PhantomData;
2use std::sync::Arc;
3
4use fedimint_core::core::{ModuleInstanceId, OperationId};
5use fedimint_core::util::broadcaststream::BroadcastStream;
6use fedimint_core::util::{BoxStream, FmtCompact};
7use fedimint_logging::LOG_CLIENT;
8use futures::StreamExt as _;
9use tracing::{debug, error, trace};
10
11use super::{DynState, State};
12use crate::module::FinalClientIface;
13use crate::sm::executor::{ActiveStateKey, InactiveStateKey};
14use crate::sm::{ActiveStateMeta, InactiveStateMeta};
15
16/// State transition notifier for a specific module instance that can only
17/// subscribe to transitions belonging to that module
18#[derive(Debug, Clone)]
19pub struct ModuleNotifier<S> {
20    broadcast: tokio::sync::broadcast::Sender<DynState>,
21    module_instance: ModuleInstanceId,
22    client: FinalClientIface,
23    /// `S` limits the type of state that can be subscribed to the one
24    /// associated with the module instance
25    _pd: PhantomData<S>,
26}
27
28impl<S> ModuleNotifier<S>
29where
30    S: State,
31{
32    pub fn new(
33        broadcast: tokio::sync::broadcast::Sender<DynState>,
34        module_instance: ModuleInstanceId,
35        client: FinalClientIface,
36    ) -> Self {
37        Self {
38            broadcast,
39            module_instance,
40            client,
41            _pd: PhantomData,
42        }
43    }
44
45    // TODO: remove duplicates and order old transitions
46    /// Subscribe to state transitions belonging to an operation and module
47    /// (module context contained in struct).
48    ///
49    /// The returned stream will contain all past state transitions that
50    /// happened before the subscription and are read from the database, after
51    /// these the stream will contain all future state transitions. The states
52    /// loaded from the database are not returned in a specific order. There may
53    /// also be duplications.
54    pub async fn subscribe(&self, operation_id: OperationId) -> BoxStream<'static, S> {
55        let to_typed_state = |state: DynState| {
56            state
57                .as_any()
58                .downcast_ref::<S>()
59                .expect("Tried to subscribe to wrong state type")
60                .clone()
61        };
62
63        // It's important to start the subscription first and then query the database to
64        // not lose any transitions in the meantime.
65        let new_transitions = self.subscribe_all_operations();
66
67        let client_strong = self.client.get();
68        let db_states = {
69            let mut dbtx = client_strong.db().begin_transaction_nc().await;
70            let active_states = client_strong
71                .read_operation_active_states(operation_id, self.module_instance, &mut dbtx)
72                .await
73                .map(|(key, val): (ActiveStateKey, ActiveStateMeta)| {
74                    (to_typed_state(key.state), val.created_at)
75                })
76                .collect::<Vec<(S, _)>>()
77                .await;
78
79            let inactive_states = self
80                .client
81                .get()
82                .read_operation_inactive_states(operation_id, self.module_instance, &mut dbtx)
83                .await
84                .map(|(key, val): (InactiveStateKey, InactiveStateMeta)| {
85                    (to_typed_state(key.state), val.created_at)
86                })
87                .collect::<Vec<(S, _)>>()
88                .await;
89
90            // FIXME: don't rely on SystemTime for ordering and introduce a state transition
91            // index instead (dpc was right again xD)
92            let num_active = active_states.len();
93            let num_inactive = inactive_states.len();
94            let mut all_states_timed = active_states
95                .into_iter()
96                .chain(inactive_states)
97                .collect::<Vec<(S, _)>>();
98            all_states_timed.sort_by_key(|(_, t1)| *t1);
99            debug!(
100                operation_id = %operation_id.fmt_short(),
101                module_instance = %self.module_instance,
102                active = num_active,
103                inactive = num_inactive,
104                "Returning state transitions from DB for notifier subscription",
105            );
106            all_states_timed
107                .into_iter()
108                .map(|(s, _)| s)
109                .collect::<Vec<S>>()
110        };
111
112        let new_transitions = new_transitions.filter_map({
113            let db_states: Arc<_> = Arc::new(db_states.clone());
114
115            move |state: S| {
116                let db_states = db_states.clone();
117                async move {
118                    if state.operation_id() == operation_id {
119                        trace!(operation_id = %operation_id.fmt_short(), ?state, "Received state transition notification");
120                        // Deduplicate events that might have both come from the DB and streamed,
121                        // due to subscribing to notifier before querying the DB.
122                        //
123                        // Note: linear search should be good enough in practice for many reasons.
124                        // Eg. states tend to have all the states in the DB, or all streamed "live",
125                        // so the overlap here should be minimal.
126                        // And we'll rewrite the whole thing anyway and use only db as a reference.
127                        if db_states.iter().any(|db_s| db_s == &state) {
128                            debug!(operation_id = %operation_id.fmt_short(), ?state, "Ignoring duplicated event");
129                            return None;
130                        }
131                        Some(state)
132                    } else {
133                        None
134                    }
135                }
136            }
137        });
138        Box::pin(futures::stream::iter(db_states).chain(new_transitions))
139    }
140
141    /// Subscribe to all state transitions belonging to the module instance.
142    pub fn subscribe_all_operations(&self) -> BoxStream<'static, S> {
143        let module_instance_id = self.module_instance;
144        Box::pin(
145            BroadcastStream::new(self.broadcast.subscribe())
146                .take_while(|res| {
147                    let cont = if let Err(err) = res {
148                        error!(target: LOG_CLIENT, err = %err.fmt_compact(), "ModuleNotifier stream stopped on error");
149                        false
150                    } else {
151                        true
152                    };
153                    std::future::ready(cont)
154                })
155                .filter_map(move |res| async move {
156                    let s = res.expect("We filtered out errors above");
157                    if s.module_instance_id() == module_instance_id {
158                        Some(
159                            s.as_any()
160                                .downcast_ref::<S>()
161                                .expect("Tried to subscribe to wrong state type")
162                                .clone(),
163                        )
164                    } else {
165                        None
166                    }
167                }),
168        )
169    }
170}