1use std::sync::Arc;
2
3use bitcoin::{consensus::Encodable, Witness};
4use eyre::OptionExt;
5use pgmq::PGMQueueExt;
6use statig::awaitable::IntoStateMachineExt;
7use tokio::sync::Mutex;
8
9use crate::{
10 database::{Database, DatabaseTransaction},
11 deposit::{DepositData, KickoffData, OperatorData},
12 states::Duty,
13};
14use clementine_errors::BridgeError;
15
16use super::{kickoff::KickoffStateMachine, round::RoundStateMachine, Owner, StateManager};
17
18#[derive(Debug, serde::Serialize, Clone, serde::Deserialize)]
22#[allow(clippy::large_enum_variant)]
23pub enum SystemEvent {
24 NewFinalizedBlock {
27 block_id: u32,
28 block: bitcoin::Block,
29 height: u32,
30 },
31 NewOperator { operator_data: OperatorData },
34 NewKickoff {
37 kickoff_data: KickoffData,
38 kickoff_height: u32,
39 deposit_data: DepositData,
40 payout_blockhash: Witness,
41 },
42}
43
44impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
45 pub async fn dispatch_new_round_machine(
47 db: &Database,
48 tx: DatabaseTransaction<'_>,
49 operator_data: OperatorData,
50 ) -> Result<(), eyre::Report> {
51 let queue_name = Self::queue_name();
52 let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
53
54 let message = SystemEvent::NewOperator { operator_data };
55 queue
56 .send_with_cxn(&queue_name, &message, &mut *(*tx))
57 .await
58 .map_err(|e| eyre::eyre!("Error sending NewOperator event: {:?}", e))?;
59 Ok(())
60 }
61
62 pub async fn dispatch_new_kickoff_machine(
64 db: &Database,
65 tx: DatabaseTransaction<'_>,
66 kickoff_data: KickoffData,
67 kickoff_height: u32,
68 deposit_data: DepositData,
69 payout_blockhash: Witness,
70 ) -> Result<(), eyre::Report> {
71 let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
72 let queue_name = Self::queue_name();
73
74 let message = SystemEvent::NewKickoff {
75 kickoff_data,
76 kickoff_height,
77 deposit_data,
78 payout_blockhash,
79 };
80 queue
81 .send_with_cxn(&queue_name, &message, &mut *(*tx))
82 .await
83 .map_err(|e| eyre::eyre!("Error sending NewKickoff event: {:?}", e))?;
84 Ok(())
85 }
86
87 pub async fn handle_event(
89 &mut self,
90 event: SystemEvent,
91 dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
92 ) -> Result<(), BridgeError> {
93 match event {
94 SystemEvent::NewFinalizedBlock {
96 block_id,
97 block,
98 height,
99 } => {
100 if self.next_height_to_process != height {
101 return Err(eyre::eyre!("Finalized block arrived to state manager out of order. Expected: block at height {}, Got: block at height {}", self.next_height_to_process, height).into());
102 }
103
104 let mut context = self.new_context(dbtx.clone(), &block, height)?;
105
106 {
108 let mut guard = dbtx.lock().await;
109 self.owner
110 .handle_finalized_block(
111 &mut guard,
112 block_id,
113 height,
114 context.cache.clone(),
115 None,
116 )
117 .await?;
118 }
119
120 self.process_block_parallel(&mut context).await?;
121
122 self.last_finalized_block = Some(context.cache.clone());
123 }
124 SystemEvent::NewOperator { operator_data } => {
126 for operator_machine in self.round_machines.iter() {
130 if operator_machine.operator_data.xonly_pk == operator_data.xonly_pk {
131 return Ok(());
132 }
133 }
134
135 let prev_height = self.config.protocol_paramset.start_height.saturating_sub(1);
138 let init_block = {
139 let mut guard = dbtx.lock().await;
140 self.get_block(Some(&mut *guard), prev_height).await?
141 };
142
143 let mut context = self.new_context(dbtx.clone(), &init_block, prev_height)?;
144
145 let operator_machine = RoundStateMachine::new(operator_data)
146 .uninitialized_state_machine()
147 .init_with_context(&mut context)
148 .await;
149
150 if !context.errors.is_empty() {
151 return Err(eyre::eyre!(
152 "Multiple errors occurred during RoundStateMachine initialization: {:?}",
153 context.errors
154 )
155 .into());
156 }
157
158 self.process_and_add_new_states_from_height(
159 dbtx.clone(),
160 vec![operator_machine],
161 vec![],
162 self.config.protocol_paramset.start_height,
163 )
164 .await?;
165 }
166 SystemEvent::NewKickoff {
168 kickoff_data,
169 kickoff_height,
170 deposit_data,
171 payout_blockhash,
172 } => {
173 if !self.owner.is_kickoff_relevant_for_owner(&kickoff_data) {
176 return Ok(());
177 }
178
179 for kickoff_machine in self.kickoff_machines.iter() {
181 let matches = [
182 kickoff_machine.kickoff_data == kickoff_data,
183 kickoff_machine.deposit_data == deposit_data,
184 kickoff_machine.payout_blockhash == payout_blockhash,
185 kickoff_machine.kickoff_height == kickoff_height,
186 ];
187 let match_count = matches.iter().filter(|&&b| b).count();
188
189 match match_count {
192 4 => return Ok(()), 0 => {} n => {
195 let mut raw_payout_blockhash = Vec::new();
196 payout_blockhash
197 .consensus_encode(&mut raw_payout_blockhash)
198 .map_err(|e| {
199 eyre::eyre!("Error encoding payout blockhash: {}", e)
200 })?;
201 let payout_blockhash_hex = hex::encode(raw_payout_blockhash);
202 let mut raw_existing_payout_blockhash = Vec::new();
203 kickoff_machine
204 .payout_blockhash
205 .consensus_encode(&mut raw_existing_payout_blockhash)
206 .map_err(|e| {
207 eyre::eyre!("Error encoding existing payout blockhash: {}", e)
208 })?;
209 let existing_payout_blockhash_hex =
210 hex::encode(raw_existing_payout_blockhash);
211 return Err(eyre::eyre!(
212 "Partial kickoff match detected ({n} of 4 fields match). This indicates data corruption or inconsistency. New kickoff data: {:?}, Existing kickoff data: {:?}, New deposit data: {:?}, Existing deposit data: {:?}, New kickoff height: {}, Existing kickoff height: {}, New payout blockhash: {}, Existing payout blockhash: {}",
213 kickoff_data,
214 kickoff_machine.kickoff_data,
215 deposit_data,
216 kickoff_machine.deposit_data,
217 kickoff_height,
218 kickoff_machine.kickoff_height,
219 payout_blockhash_hex,
220 existing_payout_blockhash_hex,
221 ).into());
222 }
223 }
224 }
225
226 let prev_height = kickoff_height.saturating_sub(1);
229 let init_block = {
230 let mut guard = dbtx.lock().await;
231 self.get_block(Some(&mut *guard), prev_height).await?
232 };
233
234 let mut context = self.new_context(dbtx.clone(), &init_block, prev_height)?;
235
236 for kickoff_machine in self.kickoff_machines.iter() {
240 if kickoff_machine.kickoff_data == kickoff_data
241 && kickoff_machine.deposit_data == deposit_data
242 && kickoff_machine.payout_blockhash == payout_blockhash
243 && kickoff_machine.kickoff_height == kickoff_height
244 {
245 return Ok(());
246 }
247 }
248
249 let kickoff_machine = KickoffStateMachine::new(
250 kickoff_data,
251 kickoff_height,
252 deposit_data.clone(),
253 payout_blockhash,
254 )
255 .uninitialized_state_machine()
256 .init_with_context(&mut context)
257 .await;
258
259 if !context.errors.is_empty() {
260 return Err(eyre::eyre!(
261 "Multiple errors occurred during KickoffStateMachine initialization: {:?}",
262 context.errors
263 )
264 .into());
265 }
266
267 self.process_and_add_new_states_from_height(
268 dbtx.clone(),
269 vec![],
270 vec![kickoff_machine],
271 kickoff_height,
272 )
273 .await?;
274 context
278 .dispatch_duty(Duty::AddRelevantTxsToTxSender {
279 kickoff_data,
280 deposit_data,
281 })
282 .await?;
283 }
284 };
285
286 let mut context = self.new_context_with_block_cache(
287 dbtx,
288 self.last_finalized_block.clone().ok_or_eyre(
289 "Last finalized block not found, should always be Some after initialization",
290 )?,
291 )?;
292
293 self.save_state_to_db(&mut context).await?;
296
297 Ok(())
298 }
299}