1use std::sync::Arc;
2
3use bitcoin::Witness;
4use eyre::OptionExt;
5use pgmq::PGMQueueExt;
6use statig::awaitable::IntoStateMachineExt;
7use tokio::sync::Mutex;
8
9use crate::{
10 database::{Database, DatabaseTransaction},
11 deposit::{DepositData, KickoffData, OperatorData},
12 errors::BridgeError,
13};
14
15use super::{kickoff::KickoffStateMachine, round::RoundStateMachine, Owner, StateManager};
16
17#[derive(Debug, serde::Serialize, Clone, serde::Deserialize)]
21#[allow(clippy::large_enum_variant)]
22pub enum SystemEvent {
23 NewFinalizedBlock {
26 block_id: u32,
27 block: bitcoin::Block,
28 height: u32,
29 },
30 NewOperator { operator_data: OperatorData },
33 NewKickoff {
36 kickoff_data: KickoffData,
37 kickoff_height: u32,
38 deposit_data: DepositData,
39 payout_blockhash: Witness,
40 },
41}
42
43impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
44 pub async fn dispatch_new_round_machine(
46 db: Database,
47 tx: DatabaseTransaction<'_, '_>,
48 operator_data: OperatorData,
49 ) -> Result<(), eyre::Report> {
50 let queue_name = StateManager::<T>::queue_name();
51 let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
52
53 let message = SystemEvent::NewOperator { operator_data };
54 queue
55 .send_with_cxn(&queue_name, &message, &mut *(*tx))
56 .await
57 .map_err(|e| eyre::eyre!("Error sending NewOperator event: {:?}", e))?;
58 Ok(())
59 }
60
61 pub async fn dispatch_new_kickoff_machine(
63 db: Database,
64 tx: DatabaseTransaction<'_, '_>,
65 kickoff_data: KickoffData,
66 kickoff_height: u32,
67 deposit_data: DepositData,
68 payout_blockhash: Witness,
69 ) -> Result<(), eyre::Report> {
70 let queue_name = StateManager::<T>::queue_name();
71 let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
72
73 let message = SystemEvent::NewKickoff {
74 kickoff_data,
75 kickoff_height,
76 deposit_data,
77 payout_blockhash,
78 };
79 queue
80 .send_with_cxn(&queue_name, &message, &mut *(*tx))
81 .await
82 .map_err(|e| eyre::eyre!("Error sending NewKickoff event: {:?}", e))?;
83 Ok(())
84 }
85
86 pub async fn handle_event(
88 &mut self,
89 event: SystemEvent,
90 dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
91 ) -> Result<(), BridgeError> {
92 match event {
93 SystemEvent::NewFinalizedBlock {
95 block_id,
96 block,
97 height,
98 } => {
99 if self.next_height_to_process != height {
100 return Err(eyre::eyre!("Finalized block arrived to state manager out of order. Expected: block at height {}, Got: block at height {}", self.next_height_to_process, height).into());
101 }
102
103 let mut context = self.new_context(dbtx.clone(), &block, height)?;
104
105 {
107 let mut guard = dbtx.lock().await;
108 self.owner
109 .handle_finalized_block(
110 &mut guard,
111 block_id,
112 height,
113 context.cache.clone(),
114 None,
115 )
116 .await?;
117 }
118
119 self.process_block_parallel(&mut context).await?;
120
121 self.last_finalized_block = Some(context.cache.clone());
122 }
123 SystemEvent::NewOperator { operator_data } => {
125 for operator_machine in self.round_machines.iter() {
129 if operator_machine.operator_data.xonly_pk == operator_data.xonly_pk {
130 return Ok(());
131 }
132 }
133
134 let prev_height = self.config.protocol_paramset.start_height.saturating_sub(1);
137 let init_block = {
138 let mut guard = dbtx.lock().await;
139 self.get_block(Some(&mut *guard), prev_height).await?
140 };
141
142 let mut context = self.new_context(dbtx.clone(), &init_block, prev_height)?;
143
144 let operator_machine = RoundStateMachine::new(operator_data)
145 .uninitialized_state_machine()
146 .init_with_context(&mut context)
147 .await;
148
149 if !context.errors.is_empty() {
150 return Err(eyre::eyre!(
151 "Multiple errors occurred during RoundStateMachine initialization: {:?}",
152 context.errors
153 )
154 .into());
155 }
156
157 self.process_and_add_new_states_from_height(
158 dbtx.clone(),
159 vec![operator_machine],
160 vec![],
161 self.config.protocol_paramset.start_height,
162 )
163 .await?;
164 }
165 SystemEvent::NewKickoff {
167 kickoff_data,
168 kickoff_height,
169 deposit_data,
170 payout_blockhash,
171 } => {
172 let prev_height = kickoff_height.saturating_sub(1);
175 let init_block = {
176 let mut guard = dbtx.lock().await;
177 self.get_block(Some(&mut *guard), prev_height).await?
178 };
179
180 let mut context = self.new_context(dbtx.clone(), &init_block, prev_height)?;
181
182 for kickoff_machine in self.kickoff_machines.iter() {
186 if kickoff_machine.kickoff_data == kickoff_data
187 && kickoff_machine.deposit_data == deposit_data
188 && kickoff_machine.payout_blockhash == payout_blockhash
189 && kickoff_machine.kickoff_height == kickoff_height
190 {
191 return Ok(());
192 }
193 }
194
195 let kickoff_machine = KickoffStateMachine::new(
196 kickoff_data,
197 kickoff_height,
198 deposit_data,
199 payout_blockhash,
200 )
201 .uninitialized_state_machine()
202 .init_with_context(&mut context)
203 .await;
204
205 if !context.errors.is_empty() {
206 return Err(eyre::eyre!(
207 "Multiple errors occurred during KickoffStateMachine initialization: {:?}",
208 context.errors
209 )
210 .into());
211 }
212
213 self.process_and_add_new_states_from_height(
214 dbtx.clone(),
215 vec![],
216 vec![kickoff_machine],
217 kickoff_height,
218 )
219 .await?;
220 }
221 };
222
223 let mut context = self.new_context_with_block_cache(
224 dbtx,
225 self.last_finalized_block.clone().ok_or_eyre(
226 "Last finalized block not found, should always be Some after initialization",
227 )?,
228 )?;
229
230 self.save_state_to_db(&mut context).await?;
233
234 Ok(())
235 }
236}