clementine_core/states/
event.rs

1use std::sync::Arc;
2
3use bitcoin::{consensus::Encodable, Witness};
4use eyre::OptionExt;
5use pgmq::PGMQueueExt;
6use statig::awaitable::IntoStateMachineExt;
7use tokio::sync::Mutex;
8
9use crate::{
10    database::{Database, DatabaseTransaction},
11    deposit::{DepositData, KickoffData, OperatorData},
12    states::Duty,
13};
14use clementine_errors::BridgeError;
15
16use super::{kickoff::KickoffStateMachine, round::RoundStateMachine, Owner, StateManager};
17
18/// System events are events that are sent by other parts of clementine to the state machine
19/// They are used to update the state machine
20/// They are sent by the state manager to the state machine
21#[derive(Debug, serde::Serialize, Clone, serde::Deserialize)]
22#[allow(clippy::large_enum_variant)]
23pub enum SystemEvent {
24    /// An event for a new finalized block
25    /// So that state manager can update the states of all current state machines
26    NewFinalizedBlock {
27        block_id: u32,
28        block: bitcoin::Block,
29        height: u32,
30    },
31    /// An event for when a new operator is set in clementine
32    /// So that the state machine can create a new round state machine to track the operator
33    NewOperator { operator_data: OperatorData },
34    /// An event for when a new kickoff is set in clementine
35    /// So that the state machine can create a new kickoff state machine to track the kickoff status
36    NewKickoff {
37        kickoff_data: KickoffData,
38        kickoff_height: u32,
39        deposit_data: DepositData,
40        payout_blockhash: Witness,
41    },
42}
43
44impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
45    /// Appends a  message to the state manager's message queue to create a new round state machine
46    pub async fn dispatch_new_round_machine(
47        db: &Database,
48        tx: DatabaseTransaction<'_>,
49        operator_data: OperatorData,
50    ) -> Result<(), eyre::Report> {
51        let queue_name = Self::queue_name();
52        let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
53
54        let message = SystemEvent::NewOperator { operator_data };
55        queue
56            .send_with_cxn(&queue_name, &message, &mut *(*tx))
57            .await
58            .map_err(|e| eyre::eyre!("Error sending NewOperator event: {:?}", e))?;
59        Ok(())
60    }
61
62    /// Appends a  message to the state manager's message queue to create a new kickoff state machine
63    pub async fn dispatch_new_kickoff_machine(
64        db: &Database,
65        tx: DatabaseTransaction<'_>,
66        kickoff_data: KickoffData,
67        kickoff_height: u32,
68        deposit_data: DepositData,
69        payout_blockhash: Witness,
70    ) -> Result<(), eyre::Report> {
71        let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
72        let queue_name = Self::queue_name();
73
74        let message = SystemEvent::NewKickoff {
75            kickoff_data,
76            kickoff_height,
77            deposit_data,
78            payout_blockhash,
79        };
80        queue
81            .send_with_cxn(&queue_name, &message, &mut *(*tx))
82            .await
83            .map_err(|e| eyre::eyre!("Error sending NewKickoff event: {:?}", e))?;
84        Ok(())
85    }
86
87    /// Handles the system events
88    pub async fn handle_event(
89        &mut self,
90        event: SystemEvent,
91        dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
92    ) -> Result<(), BridgeError> {
93        match event {
94            // Received when a block is finalized in Bitcoin
95            SystemEvent::NewFinalizedBlock {
96                block_id,
97                block,
98                height,
99            } => {
100                if self.next_height_to_process != height {
101                    return Err(eyre::eyre!("Finalized block arrived to state manager out of order. Expected: block at height {}, Got: block at height {}", self.next_height_to_process, height).into());
102                }
103
104                let mut context = self.new_context(dbtx.clone(), &block, height)?;
105
106                // Handle the finalized block on the owner (verifier or operator)
107                {
108                    let mut guard = dbtx.lock().await;
109                    self.owner
110                        .handle_finalized_block(
111                            &mut guard,
112                            block_id,
113                            height,
114                            context.cache.clone(),
115                            None,
116                        )
117                        .await?;
118                }
119
120                self.process_block_parallel(&mut context).await?;
121
122                self.last_finalized_block = Some(context.cache.clone());
123            }
124            // Received when a new operator is set in clementine
125            SystemEvent::NewOperator { operator_data } => {
126                // Check if operator's state machine already exists.
127                // This can happen if aggregator calls set_operator for the same operator multiple times.
128                // In this case, we don't want to create a new state machine.
129                for operator_machine in self.round_machines.iter() {
130                    if operator_machine.operator_data.xonly_pk == operator_data.xonly_pk {
131                        return Ok(());
132                    }
133                }
134
135                // Initialize context using the block just before the start height
136                // so subsequent processing can begin from start_height
137                let prev_height = self.config.protocol_paramset.start_height.saturating_sub(1);
138                let init_block = {
139                    let mut guard = dbtx.lock().await;
140                    self.get_block(Some(&mut *guard), prev_height).await?
141                };
142
143                let mut context = self.new_context(dbtx.clone(), &init_block, prev_height)?;
144
145                let operator_machine = RoundStateMachine::new(operator_data)
146                    .uninitialized_state_machine()
147                    .init_with_context(&mut context)
148                    .await;
149
150                if !context.errors.is_empty() {
151                    return Err(eyre::eyre!(
152                        "Multiple errors occurred during RoundStateMachine initialization: {:?}",
153                        context.errors
154                    )
155                    .into());
156                }
157
158                self.process_and_add_new_states_from_height(
159                    dbtx.clone(),
160                    vec![operator_machine],
161                    vec![],
162                    self.config.protocol_paramset.start_height,
163                )
164                .await?;
165            }
166            // Received when a new kickoff is detected
167            SystemEvent::NewKickoff {
168                kickoff_data,
169                kickoff_height,
170                deposit_data,
171                payout_blockhash,
172            } => {
173                // if kickoff is not relevant for the owner, do not process it
174                // only case right now is if owner is operator and kickoff is not of their own
175                if !self.owner.is_kickoff_relevant_for_owner(&kickoff_data) {
176                    return Ok(());
177                }
178
179                // check for duplicates
180                for kickoff_machine in self.kickoff_machines.iter() {
181                    let matches = [
182                        kickoff_machine.kickoff_data == kickoff_data,
183                        kickoff_machine.deposit_data == deposit_data,
184                        kickoff_machine.payout_blockhash == payout_blockhash,
185                        kickoff_machine.kickoff_height == kickoff_height,
186                    ];
187                    let match_count = matches.iter().filter(|&&b| b).count();
188
189                    // sanity check, should never be a partial match, otherwise something is really wrong with the bitcoin sync
190                    // this error is basically just to make sure we only added finalized kickoffs to the state manager. If it was not finalized + reorged, there can be a mismatch here.
191                    match match_count {
192                        4 => return Ok(()), // exact duplicate, skip
193                        0 => {}             // no match, continue checking other machines
194                        n => {
195                            let mut raw_payout_blockhash = Vec::new();
196                            payout_blockhash
197                                .consensus_encode(&mut raw_payout_blockhash)
198                                .map_err(|e| {
199                                    eyre::eyre!("Error encoding payout blockhash: {}", e)
200                                })?;
201                            let payout_blockhash_hex = hex::encode(raw_payout_blockhash);
202                            let mut raw_existing_payout_blockhash = Vec::new();
203                            kickoff_machine
204                                .payout_blockhash
205                                .consensus_encode(&mut raw_existing_payout_blockhash)
206                                .map_err(|e| {
207                                    eyre::eyre!("Error encoding existing payout blockhash: {}", e)
208                                })?;
209                            let existing_payout_blockhash_hex =
210                                hex::encode(raw_existing_payout_blockhash);
211                            return Err(eyre::eyre!(
212                            "Partial kickoff match detected ({n} of 4 fields match). This indicates data corruption or inconsistency. New kickoff data: {:?}, Existing kickoff data: {:?}, New deposit data: {:?}, Existing deposit data: {:?}, New kickoff height: {}, Existing kickoff height: {}, New payout blockhash: {}, Existing payout blockhash: {}",
213                            kickoff_data,
214                            kickoff_machine.kickoff_data,
215                            deposit_data,
216                            kickoff_machine.deposit_data,
217                            kickoff_height,
218                            kickoff_machine.kickoff_height,
219                            payout_blockhash_hex,
220                            existing_payout_blockhash_hex,
221                            ).into());
222                        }
223                    }
224                }
225
226                // Initialize context using the block just before the kickoff height
227                // so subsequent processing can begin from kickoff_height
228                let prev_height = kickoff_height.saturating_sub(1);
229                let init_block = {
230                    let mut guard = dbtx.lock().await;
231                    self.get_block(Some(&mut *guard), prev_height).await?
232                };
233
234                let mut context = self.new_context(dbtx.clone(), &init_block, prev_height)?;
235
236                // Check if the kickoff machine already exists. If so do not add a new one.
237                // This can happen if during block processing an error happens, reverting the state machines
238                // but a new kickoff state was already dispatched during block processing.
239                for kickoff_machine in self.kickoff_machines.iter() {
240                    if kickoff_machine.kickoff_data == kickoff_data
241                        && kickoff_machine.deposit_data == deposit_data
242                        && kickoff_machine.payout_blockhash == payout_blockhash
243                        && kickoff_machine.kickoff_height == kickoff_height
244                    {
245                        return Ok(());
246                    }
247                }
248
249                let kickoff_machine = KickoffStateMachine::new(
250                    kickoff_data,
251                    kickoff_height,
252                    deposit_data.clone(),
253                    payout_blockhash,
254                )
255                .uninitialized_state_machine()
256                .init_with_context(&mut context)
257                .await;
258
259                if !context.errors.is_empty() {
260                    return Err(eyre::eyre!(
261                        "Multiple errors occurred during KickoffStateMachine initialization: {:?}",
262                        context.errors
263                    )
264                    .into());
265                }
266
267                self.process_and_add_new_states_from_height(
268                    dbtx.clone(),
269                    vec![],
270                    vec![kickoff_machine],
271                    kickoff_height,
272                )
273                .await?;
274                // if everything is fine, add the relevant txs to the tx sender
275                // this will already be added normally, but if there was a db loss and we are resyncing, the txs will not be added.
276                // so we add them with this duty.
277                context
278                    .dispatch_duty(Duty::AddRelevantTxsToTxSender {
279                        kickoff_data,
280                        deposit_data,
281                    })
282                    .await?;
283            }
284        };
285
286        let mut context = self.new_context_with_block_cache(
287            dbtx,
288            self.last_finalized_block.clone().ok_or_eyre(
289                "Last finalized block not found, should always be Some after initialization",
290            )?,
291        )?;
292
293        // Save the state machines to the database with the current block height
294        // So that in case of a node restart the state machines can be restored
295        self.save_state_to_db(&mut context).await?;
296
297        Ok(())
298    }
299}