1use std::sync::Arc;
2
3use bitcoin::Witness;
4use eyre::{Context, OptionExt};
5use pgmq::PGMQueueExt;
6use statig::awaitable::{InitializedStateMachine, IntoStateMachineExt};
7use tokio::sync::Mutex;
8
9use crate::{
10 database::{Database, DatabaseTransaction},
11 deposit::{DepositData, KickoffData, OperatorData},
12 states::{
13 context::{DutyResult, StateContext},
14 round::RoundEvent,
15 },
16};
17use clementine_errors::BridgeError;
18
19use super::{kickoff::KickoffStateMachine, round::RoundStateMachine, Owner, StateManager};
20
21#[derive(Debug, serde::Serialize, Clone, serde::Deserialize)]
25#[allow(clippy::large_enum_variant)]
26pub enum SystemEvent {
27 NewFinalizedBlock { block: bitcoin::Block, height: u32 },
30 NewOperator { operator_data: OperatorData },
33 NewKickoff {
36 kickoff_data: KickoffData,
37 kickoff_height: u32,
38 deposit_data: DepositData,
39 payout_blockhash: Witness,
40 },
41 LCPProcessed { height: u32 },
43}
44
45impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
46 pub async fn dispatch_new_round_machine(
48 db: &Database,
49 tx: DatabaseTransaction<'_>,
50 operator_data: OperatorData,
51 ) -> Result<(), eyre::Report> {
52 let queue_name = Self::queue_name();
53 let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
54
55 let message = SystemEvent::NewOperator { operator_data };
56 queue
57 .send_with_cxn(&queue_name, &message, &mut *(*tx))
58 .await
59 .map_err(|e| eyre::eyre!("Error sending NewOperator event: {:?}", e))?;
60 Ok(())
61 }
62
63 pub async fn dispatch_lcp_processed(
65 db: &Database,
66 tx: DatabaseTransaction<'_>,
67 height: u32,
68 ) -> Result<(), eyre::Report> {
69 let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
70 let queue_name = Self::queue_name();
71 let message = SystemEvent::LCPProcessed { height };
72 queue
73 .send_with_cxn(&queue_name, &message, &mut *(*tx))
74 .await
75 .map_err(|e| eyre::eyre!("Error sending LCPProcessed event: {:?}", e))?;
76 Ok(())
77 }
78
79 pub async fn dispatch_new_kickoff_machine(
81 db: &Database,
82 tx: DatabaseTransaction<'_>,
83 kickoff_data: KickoffData,
84 kickoff_height: u32,
85 deposit_data: DepositData,
86 payout_blockhash: Witness,
87 ) -> Result<(), eyre::Report> {
88 let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
89 let queue_name = Self::queue_name();
90
91 let message = SystemEvent::NewKickoff {
92 kickoff_data,
93 kickoff_height,
94 deposit_data,
95 payout_blockhash,
96 };
97 queue
98 .send_with_cxn(&queue_name, &message, &mut *(*tx))
99 .await
100 .map_err(|e| eyre::eyre!("Error sending NewKickoff event: {:?}", e))?;
101 Ok(())
102 }
103
104 pub async fn handle_event(
106 &mut self,
107 event: SystemEvent,
108 dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
109 ) -> Result<(), BridgeError> {
110 match event {
111 SystemEvent::NewFinalizedBlock { block, height } => {
113 if self.next_height_to_process != height {
114 return Err(eyre::eyre!("Finalized block arrived to state manager out of order. Expected: block at height {}, Got: block at height {}", self.next_height_to_process, height).into());
115 }
116
117 let mut context = self.new_context(dbtx.clone(), &block, height)?;
118
119 self.process_block_parallel(&mut context).await?;
120
121 self.last_finalized_block = Some(context.cache.clone());
122 }
123 SystemEvent::NewOperator { operator_data } => {
125 for operator_machine in self.round_machines.iter() {
129 if operator_machine.operator_data.xonly_pk == operator_data.xonly_pk {
130 return Ok(());
131 }
132 }
133
134 let prev_height = self.config.protocol_paramset.start_height.saturating_sub(1);
137 let init_block = {
138 let mut guard = dbtx.lock().await;
139 self.get_block(Some(&mut *guard), prev_height).await?
140 };
141
142 let mut context = self.new_context(dbtx.clone(), &init_block, prev_height)?;
143
144 let operator_machine = RoundStateMachine::new(operator_data)
145 .uninitialized_state_machine()
146 .init_with_context(&mut context)
147 .await;
148
149 if !context.errors.is_empty() {
150 return Err(eyre::eyre!(
151 "Multiple errors occurred during RoundStateMachine initialization: {:?}",
152 context.errors
153 )
154 .into());
155 }
156
157 self.process_and_add_new_states_from_height(
158 dbtx.clone(),
159 vec![operator_machine],
160 vec![],
161 self.config.protocol_paramset.start_height,
162 )
163 .await?;
164 }
165 SystemEvent::NewKickoff {
167 kickoff_data,
168 kickoff_height,
169 deposit_data,
170 payout_blockhash,
171 } => {
172 if !self.owner.is_kickoff_relevant_for_owner(&kickoff_data) {
175 return Ok(());
176 }
177
178 for kickoff_machine in self.kickoff_machines.iter() {
180 if kickoff_machine.kickoff_data != kickoff_data {
182 continue;
183 }
184 let deposit_data_matches = kickoff_machine.deposit_data == deposit_data;
185 let payout_blockhash_matches =
186 kickoff_machine.payout_blockhash == payout_blockhash;
187 let kickoff_height_matches = kickoff_machine.kickoff_height == kickoff_height;
188
189 if deposit_data_matches && payout_blockhash_matches && kickoff_height_matches {
192 return Ok(());
193 }
194
195 let mut mismatches = Vec::new();
196 if !deposit_data_matches {
197 mismatches.push(format!(
198 "deposit_data: new={:?}, existing={:?}",
199 deposit_data, kickoff_machine.deposit_data
200 ));
201 }
202 if !payout_blockhash_matches {
203 let witness_hex =
204 |witness: &Witness| witness.iter().map(hex::encode).collect::<Vec<_>>();
205 mismatches.push(format!(
206 "payout_blockhash_witness: new={:?}, existing={:?}",
207 witness_hex(&payout_blockhash),
208 witness_hex(&kickoff_machine.payout_blockhash)
209 ));
210 }
211 if !kickoff_height_matches {
212 mismatches.push(format!(
213 "kickoff_height: new={}, existing={}",
214 kickoff_height, kickoff_machine.kickoff_height
215 ));
216 }
217
218 return Err(eyre::eyre!(
219 "Conflicting kickoff({:?}) detected: same kickoff_data, mismatches: {}",
220 kickoff_data,
221 mismatches.join("; "),
222 )
223 .into());
224 }
225
226 let prev_height = kickoff_height.saturating_sub(1);
229 let init_block = {
230 let mut guard = dbtx.lock().await;
231 self.get_block(Some(&mut *guard), prev_height).await?
232 };
233
234 let mut context = self.new_context(dbtx.clone(), &init_block, prev_height)?;
235
236 let kickoff_machine = KickoffStateMachine::new(
237 kickoff_data,
238 kickoff_height,
239 deposit_data.clone(),
240 payout_blockhash.clone(),
241 )
242 .uninitialized_state_machine()
243 .init_with_context(&mut context)
244 .await;
245
246 if !context.errors.is_empty() {
247 return Err(eyre::eyre!(
248 "Multiple errors occurred during KickoffStateMachine initialization: {:?}",
249 context.errors
250 )
251 .into());
252 }
253
254 self.process_and_add_new_states_from_height(
255 dbtx.clone(),
256 vec![],
257 vec![kickoff_machine],
258 kickoff_height,
259 )
260 .await?;
261
262 if let Some(last_lcp_height) = self.last_processed_lcp {
264 if last_lcp_height >= kickoff_height {
265 self.check_if_kickoff_malicious(
266 &payout_blockhash,
267 &kickoff_data,
268 &deposit_data,
269 &mut context,
270 )
271 .await?;
272 }
273 }
274 }
275 SystemEvent::LCPProcessed { height } => {
277 let kickoffs_to_check: Vec<_> = self
278 .kickoff_machines
279 .iter()
280 .filter(|machine| machine.kickoff_height == height)
281 .map(|machine| {
282 (
283 machine.payout_blockhash.clone(),
284 machine.kickoff_data,
285 machine.deposit_data.clone(),
286 )
287 })
288 .collect();
289
290 if !kickoffs_to_check.is_empty() {
291 let mut dummy_context = self.new_context_with_block_cache(
293 dbtx.clone(),
294 self.last_finalized_block.clone().ok_or_eyre(
295 "Last finalized block not found, should always be Some after initialization",
296 )?,
297 )?;
298
299 for (payout_blockhash, kickoff_data, deposit_data) in kickoffs_to_check {
300 self.check_if_kickoff_malicious(
301 &payout_blockhash,
302 &kickoff_data,
303 &deposit_data,
304 &mut dummy_context,
305 )
306 .await?;
307 }
308 }
309
310 tracing::info!("LCP processed for height: {}", height);
311
312 self.last_processed_lcp = Some(height);
313 }
314 };
315
316 let mut context = self.new_context_with_block_cache(
317 dbtx,
318 self.last_finalized_block.clone().ok_or_eyre(
319 "Last finalized block not found, should always be Some after initialization",
320 )?,
321 )?;
322
323 self.save_state_to_db(&mut context).await?;
326
327 Ok(())
328 }
329
330 fn get_round_machine(
331 &mut self,
332 operator_xonly_pk: &bitcoin::XOnlyPublicKey,
333 ) -> Option<&mut InitializedStateMachine<RoundStateMachine<T>>> {
334 self.round_machines
335 .iter_mut()
336 .find(|machine| &machine.operator_data.xonly_pk == operator_xonly_pk)
337 }
338
339 async fn check_if_kickoff_malicious(
340 &mut self,
341 payout_blockhash: &Witness,
342 kickoff_data: &KickoffData,
343 deposit_data: &DepositData,
344 context: &mut StateContext<T>,
345 ) -> Result<(), BridgeError> {
346 let was_challenged_before = {
349 let round_machine = self
350 .get_round_machine(&kickoff_data.operator_xonly_pk)
351 .ok_or_else(|| {
352 eyre::eyre!(
353 "Round machine not found for operator {} while checking if kickoff is malicious",
354 kickoff_data.operator_xonly_pk
355 )
356 })?;
357
358 round_machine
359 .challenged_rounds
360 .contains(&kickoff_data.round_idx)
361 };
362
363 let duty = super::Duty::CheckIfKickoffMalicious {
364 kickoff_data: *kickoff_data,
365 deposit_data: deposit_data.clone(),
366 kickoff_witness: payout_blockhash.clone(),
367 challenged_before: was_challenged_before,
368 };
369
370 let res = context
371 .dispatch_duty(duty)
372 .await
373 .wrap_err("Error while checking if kickoff is malicious")?;
374
375 match res {
376 DutyResult::CheckIfKickoffMalicious { challenged } => {
377 if challenged && !was_challenged_before {
378 if let Some(round_machine) =
380 self.get_round_machine(&kickoff_data.operator_xonly_pk)
381 {
382 round_machine
383 .handle_with_context(
384 &RoundEvent::SetChallenged {
385 round_idx: kickoff_data.round_idx,
386 },
387 context,
388 )
389 .await;
390 }
391 }
392 }
393 other => {
394 return Err(eyre::eyre!(
395 "Expected CheckIfKickoffMalicious duty result, got {other:?}"
396 )
397 .into());
398 }
399 }
400
401 Ok(())
402 }
403}