clementine_core/states/
event.rs

1use std::sync::Arc;
2
3use bitcoin::Witness;
4use eyre::{Context, OptionExt};
5use pgmq::PGMQueueExt;
6use statig::awaitable::{InitializedStateMachine, IntoStateMachineExt};
7use tokio::sync::Mutex;
8
9use crate::{
10    database::{Database, DatabaseTransaction},
11    deposit::{DepositData, KickoffData, OperatorData},
12    states::{
13        context::{DutyResult, StateContext},
14        round::RoundEvent,
15    },
16};
17use clementine_errors::BridgeError;
18
19use super::{kickoff::KickoffStateMachine, round::RoundStateMachine, Owner, StateManager};
20
21/// System events are events that are sent by other parts of clementine to the state machine
22/// They are used to update the state machine
23/// They are sent by the state manager to the state machine
24#[derive(Debug, serde::Serialize, Clone, serde::Deserialize)]
25#[allow(clippy::large_enum_variant)]
26pub enum SystemEvent {
27    /// An event for a new finalized block
28    /// So that state manager can update the states of all current state machines
29    NewFinalizedBlock { block: bitcoin::Block, height: u32 },
30    /// An event for when a new operator is set in clementine
31    /// So that the state machine can create a new round state machine to track the operator
32    NewOperator { operator_data: OperatorData },
33    /// An event for when a new kickoff is set in clementine
34    /// So that the state machine can create a new kickoff state machine to track the kickoff status
35    NewKickoff {
36        kickoff_data: KickoffData,
37        kickoff_height: u32,
38        deposit_data: DepositData,
39        payout_blockhash: Witness,
40    },
41    /// An event for when the LCP for an L1 block height is processed
42    LCPProcessed { height: u32 },
43}
44
45impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
46    /// Appends a  message to the state manager's message queue to create a new round state machine
47    pub async fn dispatch_new_round_machine(
48        db: &Database,
49        tx: DatabaseTransaction<'_>,
50        operator_data: OperatorData,
51    ) -> Result<(), eyre::Report> {
52        let queue_name = Self::queue_name();
53        let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
54
55        let message = SystemEvent::NewOperator { operator_data };
56        queue
57            .send_with_cxn(&queue_name, &message, &mut *(*tx))
58            .await
59            .map_err(|e| eyre::eyre!("Error sending NewOperator event: {:?}", e))?;
60        Ok(())
61    }
62
63    /// Appends a message to the state manager's message queue to notify that the LCP for an L1 block height is processed
64    pub async fn dispatch_lcp_processed(
65        db: &Database,
66        tx: DatabaseTransaction<'_>,
67        height: u32,
68    ) -> Result<(), eyre::Report> {
69        let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
70        let queue_name = Self::queue_name();
71        let message = SystemEvent::LCPProcessed { height };
72        queue
73            .send_with_cxn(&queue_name, &message, &mut *(*tx))
74            .await
75            .map_err(|e| eyre::eyre!("Error sending LCPProcessed event: {:?}", e))?;
76        Ok(())
77    }
78
79    /// Appends a  message to the state manager's message queue to create a new kickoff state machine
80    pub async fn dispatch_new_kickoff_machine(
81        db: &Database,
82        tx: DatabaseTransaction<'_>,
83        kickoff_data: KickoffData,
84        kickoff_height: u32,
85        deposit_data: DepositData,
86        payout_blockhash: Witness,
87    ) -> Result<(), eyre::Report> {
88        let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
89        let queue_name = Self::queue_name();
90
91        let message = SystemEvent::NewKickoff {
92            kickoff_data,
93            kickoff_height,
94            deposit_data,
95            payout_blockhash,
96        };
97        queue
98            .send_with_cxn(&queue_name, &message, &mut *(*tx))
99            .await
100            .map_err(|e| eyre::eyre!("Error sending NewKickoff event: {:?}", e))?;
101        Ok(())
102    }
103
104    /// Handles the system events
105    pub async fn handle_event(
106        &mut self,
107        event: SystemEvent,
108        dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
109    ) -> Result<(), BridgeError> {
110        match event {
111            // Received when a block is finalized in Bitcoin
112            SystemEvent::NewFinalizedBlock { block, height } => {
113                if self.next_height_to_process != height {
114                    return Err(eyre::eyre!("Finalized block arrived to state manager out of order. Expected: block at height {}, Got: block at height {}", self.next_height_to_process, height).into());
115                }
116
117                let mut context = self.new_context(dbtx.clone(), &block, height)?;
118
119                self.process_block_parallel(&mut context).await?;
120
121                self.last_finalized_block = Some(context.cache.clone());
122            }
123            // Received when a new operator is set in clementine
124            SystemEvent::NewOperator { operator_data } => {
125                // Check if operator's state machine already exists.
126                // This can happen if aggregator calls set_operator for the same operator multiple times.
127                // In this case, we don't want to create a new state machine.
128                for operator_machine in self.round_machines.iter() {
129                    if operator_machine.operator_data.xonly_pk == operator_data.xonly_pk {
130                        return Ok(());
131                    }
132                }
133
134                // Initialize context using the block just before the start height
135                // so subsequent processing can begin from start_height
136                let prev_height = self.config.protocol_paramset.start_height.saturating_sub(1);
137                let init_block = {
138                    let mut guard = dbtx.lock().await;
139                    self.get_block(Some(&mut *guard), prev_height).await?
140                };
141
142                let mut context = self.new_context(dbtx.clone(), &init_block, prev_height)?;
143
144                let operator_machine = RoundStateMachine::new(operator_data)
145                    .uninitialized_state_machine()
146                    .init_with_context(&mut context)
147                    .await;
148
149                if !context.errors.is_empty() {
150                    return Err(eyre::eyre!(
151                        "Multiple errors occurred during RoundStateMachine initialization: {:?}",
152                        context.errors
153                    )
154                    .into());
155                }
156
157                self.process_and_add_new_states_from_height(
158                    dbtx.clone(),
159                    vec![operator_machine],
160                    vec![],
161                    self.config.protocol_paramset.start_height,
162                )
163                .await?;
164            }
165            // Received when a new kickoff is detected
166            SystemEvent::NewKickoff {
167                kickoff_data,
168                kickoff_height,
169                deposit_data,
170                payout_blockhash,
171            } => {
172                // if kickoff is not relevant for the owner, do not process it
173                // only case right now is if owner is operator and kickoff is not of their own
174                if !self.owner.is_kickoff_relevant_for_owner(&kickoff_data) {
175                    return Ok(());
176                }
177
178                // check for duplicates
179                for kickoff_machine in self.kickoff_machines.iter() {
180                    // if they do not have the same kickoff data (same operator + kickoff utxo), it's definitely not a duplicate
181                    if kickoff_machine.kickoff_data != kickoff_data {
182                        continue;
183                    }
184                    let deposit_data_matches = kickoff_machine.deposit_data == deposit_data;
185                    let payout_blockhash_matches =
186                        kickoff_machine.payout_blockhash == payout_blockhash;
187                    let kickoff_height_matches = kickoff_machine.kickoff_height == kickoff_height;
188
189                    // Same kickoff_data should always imply the same associated fields.
190                    // This catches inconsistent finalized kickoff data, for example after a reorged kickoff was added too early.
191                    if deposit_data_matches && payout_blockhash_matches && kickoff_height_matches {
192                        return Ok(());
193                    }
194
195                    let mut mismatches = Vec::new();
196                    if !deposit_data_matches {
197                        mismatches.push(format!(
198                            "deposit_data: new={:?}, existing={:?}",
199                            deposit_data, kickoff_machine.deposit_data
200                        ));
201                    }
202                    if !payout_blockhash_matches {
203                        let witness_hex =
204                            |witness: &Witness| witness.iter().map(hex::encode).collect::<Vec<_>>();
205                        mismatches.push(format!(
206                            "payout_blockhash_witness: new={:?}, existing={:?}",
207                            witness_hex(&payout_blockhash),
208                            witness_hex(&kickoff_machine.payout_blockhash)
209                        ));
210                    }
211                    if !kickoff_height_matches {
212                        mismatches.push(format!(
213                            "kickoff_height: new={}, existing={}",
214                            kickoff_height, kickoff_machine.kickoff_height
215                        ));
216                    }
217
218                    return Err(eyre::eyre!(
219                        "Conflicting kickoff({:?}) detected: same kickoff_data, mismatches: {}",
220                        kickoff_data,
221                        mismatches.join("; "),
222                    )
223                    .into());
224                }
225
226                // Initialize context using the block just before the kickoff height
227                // so subsequent processing can begin from kickoff_height
228                let prev_height = kickoff_height.saturating_sub(1);
229                let init_block = {
230                    let mut guard = dbtx.lock().await;
231                    self.get_block(Some(&mut *guard), prev_height).await?
232                };
233
234                let mut context = self.new_context(dbtx.clone(), &init_block, prev_height)?;
235
236                let kickoff_machine = KickoffStateMachine::new(
237                    kickoff_data,
238                    kickoff_height,
239                    deposit_data.clone(),
240                    payout_blockhash.clone(),
241                )
242                .uninitialized_state_machine()
243                .init_with_context(&mut context)
244                .await;
245
246                if !context.errors.is_empty() {
247                    return Err(eyre::eyre!(
248                        "Multiple errors occurred during KickoffStateMachine initialization: {:?}",
249                        context.errors
250                    )
251                    .into());
252                }
253
254                self.process_and_add_new_states_from_height(
255                    dbtx.clone(),
256                    vec![],
257                    vec![kickoff_machine],
258                    kickoff_height,
259                )
260                .await?;
261
262                // check if malicious if lcp is already processed for the kickoff height
263                if let Some(last_lcp_height) = self.last_processed_lcp {
264                    if last_lcp_height >= kickoff_height {
265                        self.check_if_kickoff_malicious(
266                            &payout_blockhash,
267                            &kickoff_data,
268                            &deposit_data,
269                            &mut context,
270                        )
271                        .await?;
272                    }
273                }
274            }
275            // Received when a the LCP for an L1 block height is processed
276            SystemEvent::LCPProcessed { height } => {
277                let kickoffs_to_check: Vec<_> = self
278                    .kickoff_machines
279                    .iter()
280                    .filter(|machine| machine.kickoff_height == height)
281                    .map(|machine| {
282                        (
283                            machine.payout_blockhash.clone(),
284                            machine.kickoff_data,
285                            machine.deposit_data.clone(),
286                        )
287                    })
288                    .collect();
289
290                if !kickoffs_to_check.is_empty() {
291                    // create a dummy context for duty processing, a block is not needed for LCPProcessed
292                    let mut dummy_context = self.new_context_with_block_cache(
293                        dbtx.clone(),
294                        self.last_finalized_block.clone().ok_or_eyre(
295                            "Last finalized block not found, should always be Some after initialization",
296                        )?,
297                    )?;
298
299                    for (payout_blockhash, kickoff_data, deposit_data) in kickoffs_to_check {
300                        self.check_if_kickoff_malicious(
301                            &payout_blockhash,
302                            &kickoff_data,
303                            &deposit_data,
304                            &mut dummy_context,
305                        )
306                        .await?;
307                    }
308                }
309
310                tracing::info!("LCP processed for height: {}", height);
311
312                self.last_processed_lcp = Some(height);
313            }
314        };
315
316        let mut context = self.new_context_with_block_cache(
317            dbtx,
318            self.last_finalized_block.clone().ok_or_eyre(
319                "Last finalized block not found, should always be Some after initialization",
320            )?,
321        )?;
322
323        // Save the state machines to the database with the current block height
324        // So that in case of a node restart the state machines can be restored
325        self.save_state_to_db(&mut context).await?;
326
327        Ok(())
328    }
329
330    fn get_round_machine(
331        &mut self,
332        operator_xonly_pk: &bitcoin::XOnlyPublicKey,
333    ) -> Option<&mut InitializedStateMachine<RoundStateMachine<T>>> {
334        self.round_machines
335            .iter_mut()
336            .find(|machine| &machine.operator_data.xonly_pk == operator_xonly_pk)
337    }
338
339    async fn check_if_kickoff_malicious(
340        &mut self,
341        payout_blockhash: &Witness,
342        kickoff_data: &KickoffData,
343        deposit_data: &DepositData,
344        context: &mut StateContext<T>,
345    ) -> Result<(), BridgeError> {
346        // Pull the current round state data first to avoid holding a mutable borrow of self
347        // while calling into owner duties (which require an immutable borrow of self.owner).
348        let was_challenged_before = {
349            let round_machine = self
350                .get_round_machine(&kickoff_data.operator_xonly_pk)
351                .ok_or_else(|| {
352                    eyre::eyre!(
353                        "Round machine not found for operator {} while checking if kickoff is malicious",
354                        kickoff_data.operator_xonly_pk
355                    )
356                })?;
357
358            round_machine
359                .challenged_rounds
360                .contains(&kickoff_data.round_idx)
361        };
362
363        let duty = super::Duty::CheckIfKickoffMalicious {
364            kickoff_data: *kickoff_data,
365            deposit_data: deposit_data.clone(),
366            kickoff_witness: payout_blockhash.clone(),
367            challenged_before: was_challenged_before,
368        };
369
370        let res = context
371            .dispatch_duty(duty)
372            .await
373            .wrap_err("Error while checking if kickoff is malicious")?;
374
375        match res {
376            DutyResult::CheckIfKickoffMalicious { challenged } => {
377                if challenged && !was_challenged_before {
378                    // Reacquire the round machine mutably to update the challenged flag
379                    if let Some(round_machine) =
380                        self.get_round_machine(&kickoff_data.operator_xonly_pk)
381                    {
382                        round_machine
383                            .handle_with_context(
384                                &RoundEvent::SetChallenged {
385                                    round_idx: kickoff_data.round_idx,
386                                },
387                                context,
388                            )
389                            .await;
390                    }
391                }
392            }
393            other => {
394                return Err(eyre::eyre!(
395                    "Expected CheckIfKickoffMalicious duty result, got {other:?}"
396                )
397                .into());
398            }
399        }
400
401        Ok(())
402    }
403}