clementine_core/states/
event.rs

1use std::sync::Arc;
2
3use bitcoin::Witness;
4use eyre::OptionExt;
5use pgmq::PGMQueueExt;
6use statig::awaitable::IntoStateMachineExt;
7use tokio::sync::Mutex;
8
9use crate::{
10    database::{Database, DatabaseTransaction},
11    deposit::{DepositData, KickoffData, OperatorData},
12    errors::BridgeError,
13};
14
15use super::{kickoff::KickoffStateMachine, round::RoundStateMachine, Owner, StateManager};
16
17/// System events are events that are sent by other parts of clementine to the state machine
18/// They are used to update the state machine
19/// They are sent by the state manager to the state machine
20#[derive(Debug, serde::Serialize, Clone, serde::Deserialize)]
21#[allow(clippy::large_enum_variant)]
22pub enum SystemEvent {
23    /// An event for a new finalized block
24    /// So that state manager can update the states of all current state machines
25    NewFinalizedBlock {
26        block_id: u32,
27        block: bitcoin::Block,
28        height: u32,
29    },
30    /// An event for when a new operator is set in clementine
31    /// So that the state machine can create a new round state machine to track the operator
32    NewOperator { operator_data: OperatorData },
33    /// An event for when a new kickoff is set in clementine
34    /// So that the state machine can create a new kickoff state machine to track the kickoff status
35    NewKickoff {
36        kickoff_data: KickoffData,
37        kickoff_height: u32,
38        deposit_data: DepositData,
39        payout_blockhash: Witness,
40    },
41}
42
43impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
44    /// Appends a  message to the state manager's message queue to create a new round state machine
45    pub async fn dispatch_new_round_machine(
46        db: Database,
47        tx: DatabaseTransaction<'_, '_>,
48        operator_data: OperatorData,
49    ) -> Result<(), eyre::Report> {
50        let queue_name = StateManager::<T>::queue_name();
51        let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
52
53        let message = SystemEvent::NewOperator { operator_data };
54        queue
55            .send_with_cxn(&queue_name, &message, &mut *(*tx))
56            .await
57            .map_err(|e| eyre::eyre!("Error sending NewOperator event: {:?}", e))?;
58        Ok(())
59    }
60
61    /// Appends a  message to the state manager's message queue to create a new kickoff state machine
62    pub async fn dispatch_new_kickoff_machine(
63        db: Database,
64        tx: DatabaseTransaction<'_, '_>,
65        kickoff_data: KickoffData,
66        kickoff_height: u32,
67        deposit_data: DepositData,
68        payout_blockhash: Witness,
69    ) -> Result<(), eyre::Report> {
70        let queue_name = StateManager::<T>::queue_name();
71        let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
72
73        let message = SystemEvent::NewKickoff {
74            kickoff_data,
75            kickoff_height,
76            deposit_data,
77            payout_blockhash,
78        };
79        queue
80            .send_with_cxn(&queue_name, &message, &mut *(*tx))
81            .await
82            .map_err(|e| eyre::eyre!("Error sending NewKickoff event: {:?}", e))?;
83        Ok(())
84    }
85
86    /// Handles the system events
87    pub async fn handle_event(
88        &mut self,
89        event: SystemEvent,
90        dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
91    ) -> Result<(), BridgeError> {
92        match event {
93            // Received when a block is finalized in Bitcoin
94            SystemEvent::NewFinalizedBlock {
95                block_id,
96                block,
97                height,
98            } => {
99                if self.next_height_to_process != height {
100                    return Err(eyre::eyre!("Finalized block arrived to state manager out of order. Expected: block at height {}, Got: block at height {}", self.next_height_to_process, height).into());
101                }
102
103                let mut context = self.new_context(dbtx.clone(), &block, height)?;
104
105                // Handle the finalized block on the owner (verifier or operator)
106                {
107                    let mut guard = dbtx.lock().await;
108                    self.owner
109                        .handle_finalized_block(
110                            &mut guard,
111                            block_id,
112                            height,
113                            context.cache.clone(),
114                            None,
115                        )
116                        .await?;
117                }
118
119                self.process_block_parallel(&mut context).await?;
120
121                self.last_finalized_block = Some(context.cache.clone());
122            }
123            // Received when a new operator is set in clementine
124            SystemEvent::NewOperator { operator_data } => {
125                // Check if operator's state machine already exists.
126                // This can happen if aggregator calls set_operator for the same operator multiple times.
127                // In this case, we don't want to create a new state machine.
128                for operator_machine in self.round_machines.iter() {
129                    if operator_machine.operator_data.xonly_pk == operator_data.xonly_pk {
130                        return Ok(());
131                    }
132                }
133
134                // Initialize context using the block just before the start height
135                // so subsequent processing can begin from start_height
136                let prev_height = self.config.protocol_paramset.start_height.saturating_sub(1);
137                let init_block = {
138                    let mut guard = dbtx.lock().await;
139                    self.get_block(Some(&mut *guard), prev_height).await?
140                };
141
142                let mut context = self.new_context(dbtx.clone(), &init_block, prev_height)?;
143
144                let operator_machine = RoundStateMachine::new(operator_data)
145                    .uninitialized_state_machine()
146                    .init_with_context(&mut context)
147                    .await;
148
149                if !context.errors.is_empty() {
150                    return Err(eyre::eyre!(
151                        "Multiple errors occurred during RoundStateMachine initialization: {:?}",
152                        context.errors
153                    )
154                    .into());
155                }
156
157                self.process_and_add_new_states_from_height(
158                    dbtx.clone(),
159                    vec![operator_machine],
160                    vec![],
161                    self.config.protocol_paramset.start_height,
162                )
163                .await?;
164            }
165            // Received when a new kickoff is detected
166            SystemEvent::NewKickoff {
167                kickoff_data,
168                kickoff_height,
169                deposit_data,
170                payout_blockhash,
171            } => {
172                // Initialize context using the block just before the kickoff height
173                // so subsequent processing can begin from kickoff_height
174                let prev_height = kickoff_height.saturating_sub(1);
175                let init_block = {
176                    let mut guard = dbtx.lock().await;
177                    self.get_block(Some(&mut *guard), prev_height).await?
178                };
179
180                let mut context = self.new_context(dbtx.clone(), &init_block, prev_height)?;
181
182                // Check if the kickoff machine already exists. If so do not add a new one.
183                // This can happen if during block processing an error happens, reverting the state machines
184                // but a new kickoff state was already dispatched during block processing.
185                for kickoff_machine in self.kickoff_machines.iter() {
186                    if kickoff_machine.kickoff_data == kickoff_data
187                        && kickoff_machine.deposit_data == deposit_data
188                        && kickoff_machine.payout_blockhash == payout_blockhash
189                        && kickoff_machine.kickoff_height == kickoff_height
190                    {
191                        return Ok(());
192                    }
193                }
194
195                let kickoff_machine = KickoffStateMachine::new(
196                    kickoff_data,
197                    kickoff_height,
198                    deposit_data,
199                    payout_blockhash,
200                )
201                .uninitialized_state_machine()
202                .init_with_context(&mut context)
203                .await;
204
205                if !context.errors.is_empty() {
206                    return Err(eyre::eyre!(
207                        "Multiple errors occurred during KickoffStateMachine initialization: {:?}",
208                        context.errors
209                    )
210                    .into());
211                }
212
213                self.process_and_add_new_states_from_height(
214                    dbtx.clone(),
215                    vec![],
216                    vec![kickoff_machine],
217                    kickoff_height,
218                )
219                .await?;
220            }
221        };
222
223        let mut context = self.new_context_with_block_cache(
224            dbtx,
225            self.last_finalized_block.clone().ok_or_eyre(
226                "Last finalized block not found, should always be Some after initialization",
227            )?,
228        )?;
229
230        // Save the state machines to the database with the current block height
231        // So that in case of a node restart the state machines can be restored
232        self.save_state_to_db(&mut context).await?;
233
234        Ok(())
235    }
236}