1pub use crate::builder::block_cache;
22use crate::config::BridgeConfig;
23use crate::database::{Database, DatabaseTransaction};
24use crate::errors::BridgeError;
25use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
26use crate::states::block_cache::BlockCache;
27use eyre::{Context, OptionExt};
28use futures::future::{join, join_all};
29use kickoff::KickoffEvent;
30use matcher::BlockMatcher;
31use pgmq::PGMQueueExt;
32use round::RoundEvent;
33use statig::awaitable::{InitializedStateMachine, UninitializedStateMachine};
34use statig::prelude::*;
35use std::cmp::max;
36use std::future::Future;
37use std::sync::Arc;
38use thiserror::Error;
39use tokio::sync::Mutex;
40
41pub mod context;
42mod event;
43pub mod kickoff;
44mod matcher;
45pub mod round;
46pub mod task;
47
48pub use context::{Duty, Owner};
49pub use event::SystemEvent;
50
51#[derive(Debug, Error)]
52pub enum StateMachineError {
53 #[error("State machine received event that it doesn't know how to handle: {0}")]
54 UnhandledEvent(String),
55
56 #[error(transparent)]
57 Other(#[from] eyre::Report),
58}
59pub(crate) enum ContextProcessResult<
60 T: Owner,
61 M: IntoStateMachine,
62 Fut: Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send,
63> {
64 Unchanged(InitializedStateMachine<M>),
65 Processing(Fut),
66}
67
68pub(crate) trait ContextProcessor<T: Owner, M: IntoStateMachine> {
70 fn process_with_ctx(
74 self,
75 block: &context::StateContext<T>,
76 ) -> ContextProcessResult<
77 T,
78 M,
79 impl Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send,
80 >;
81}
82
83impl<T, M> ContextProcessor<T, M> for InitializedStateMachine<M>
85where
86 T: Owner,
87 for<'evt, 'ctx> M: IntoStateMachine<Event<'evt> = M::StateEvent, Context<'ctx> = context::StateContext<T>>
88 + Send
89 + BlockMatcher
90 + Clone,
91 M::State: awaitable::State<M> + 'static + Send,
92 for<'sub> M::Superstate<'sub>: awaitable::Superstate<M> + Send,
93 for<'evt> M::Event<'evt>: Send + Sync,
94{
95 fn process_with_ctx(
96 mut self,
97 block: &context::StateContext<T>,
98 ) -> ContextProcessResult<T, M, impl Future<Output = (Self, context::StateContext<T>)> + Send>
99 {
100 let events = self.match_block(&block.cache);
101 if events.is_empty() {
102 ContextProcessResult::Unchanged(self)
103 } else {
104 let mut ctx = block.clone();
105 ContextProcessResult::Processing(async move {
106 for event in events {
107 self.handle_with_context(&event, &mut ctx).await;
108 }
109 (self, ctx)
110 })
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
121pub struct StateManager<T: Owner> {
122 pub db: Database,
123 queue: PGMQueueExt,
124 owner: T,
125 round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
126 kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
127 config: BridgeConfig,
128 next_height_to_process: u32,
129 rpc: ExtendedBitcoinRpc,
130 last_finalized_block: Option<Arc<BlockCache>>,
132}
133
134impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
135 pub fn queue_name() -> String {
137 format!("{}_state_mgr_events", T::ENTITY_NAME)
138 }
139
140 pub fn clone_without_machines(&self) -> Self {
141 Self {
142 db: self.db.clone(),
143 queue: self.queue.clone(),
144 owner: self.owner.clone(),
145 round_machines: Vec::new(),
146 kickoff_machines: Vec::new(),
147 config: self.config.clone(),
148 next_height_to_process: self.next_height_to_process,
149 rpc: self.rpc.clone(),
150 last_finalized_block: self.last_finalized_block.clone(),
151 }
152 }
153
154 pub fn new_context(
158 &self,
159 dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
160 block: &bitcoin::Block,
161 block_height: u32,
162 ) -> Result<context::StateContext<T>, BridgeError> {
163 Ok(context::StateContext::new(
164 dbtx,
165 Arc::new(self.owner.clone()),
166 Arc::new(BlockCache::from_block(block.clone(), block_height)),
167 self.config.clone(),
168 ))
169 }
170
171 pub fn new_context_with_block_cache(
172 &self,
173 dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
174 block_cache: Arc<BlockCache>,
175 ) -> Result<context::StateContext<T>, BridgeError> {
176 Ok(context::StateContext::new(
177 dbtx,
178 Arc::new(self.owner.clone()),
179 block_cache,
180 self.config.clone(),
181 ))
182 }
183
184 pub async fn new(
185 db: Database,
186 owner: T,
187 rpc: ExtendedBitcoinRpc,
188 config: BridgeConfig,
189 ) -> eyre::Result<Self> {
190 let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
191
192 queue.create(&Self::queue_name()).await.wrap_err_with(|| {
193 format!("Error creating pqmq queue with name {}", Self::queue_name())
194 })?;
195
196 let mut mgr = Self {
197 last_finalized_block: None,
198 db,
199 owner,
200 config: config.clone(),
201 round_machines: Vec::new(),
202 kickoff_machines: Vec::new(),
203 queue,
204 next_height_to_process: config.protocol_paramset.start_height,
205 rpc,
206 };
207
208 mgr.reload_state_manager_from_db().await?;
209 Ok(mgr)
210 }
211
212 async fn get_block(
213 &self,
214 dbtx: Option<DatabaseTransaction<'_, '_>>,
215 height: u32,
216 ) -> Result<bitcoin::Block, BridgeError> {
217 match self.db.get_full_block(dbtx, height).await? {
218 Some(block) => Ok(block),
219 None => Ok(self.rpc.get_block_by_height(height.into()).await?),
220 }
221 }
222
223 pub async fn reload_state_manager_from_db(&mut self) -> Result<(), BridgeError> {
229 let mut dbtx = self.db.begin_transaction().await?;
230 let owner_type = T::ENTITY_NAME;
231 let status = self
232 .db
233 .get_next_height_to_process(Some(&mut dbtx), owner_type)
234 .await?;
235
236 self.next_height_to_process = match status {
238 Some(block_height) => {
239 u32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?
240 }
241 None => {
242 tracing::info!("No state machines found in the database");
243 self.config.protocol_paramset.start_height
244 }
245 };
246
247 tracing::info!(
248 "Loading state machines from block height {}",
249 self.next_height_to_process.saturating_sub(1)
250 );
251
252 let last_height = self.next_height_to_process.saturating_sub(1);
253
254 self.last_finalized_block = Some(Arc::new(BlockCache::from_block(
255 match self.db.get_full_block(None, last_height).await? {
256 Some(block) => block,
257 None => self.rpc.get_block_by_height(last_height.into()).await?,
258 },
259 last_height,
260 )));
261
262 let kickoff_machines = self
264 .db
265 .load_kickoff_machines(Some(&mut dbtx), owner_type)
266 .await?;
267
268 let round_machines = self
270 .db
271 .load_round_machines(Some(&mut dbtx), owner_type)
272 .await?;
273
274 let init_dbtx = Arc::new(Mutex::new(dbtx));
275
276 let mut ctx = self.new_context_with_block_cache(
277 init_dbtx.clone(),
278 self.last_finalized_block
279 .clone()
280 .expect("Initialized before"),
281 )?;
282
283 for (state_json, kickoff_id, saved_block_height) in &kickoff_machines {
285 tracing::debug!(
286 "Loaded kickoff machine: state={}, block_height={}",
287 state_json,
288 saved_block_height
289 );
290
291 let machine: Result<UninitializedStateMachine<kickoff::KickoffStateMachine<T>>, _> =
293 serde_json::from_str(state_json);
294
295 match machine {
296 Ok(uninitialized) => {
297 let initialized = uninitialized.init_with_context(&mut ctx).await;
299 self.kickoff_machines.push(initialized);
300 }
301 Err(e) => {
302 tracing::warn!(
303 "Failed to deserialize kickoff machine with ID {}: {}",
304 kickoff_id,
305 e
306 );
307 }
308 }
309 }
310
311 for (state_json, operator_xonly_pk, saved_block_height) in &round_machines {
313 tracing::debug!(
314 "Loaded round machine: state={}, block_height={}",
315 state_json,
316 saved_block_height
317 );
318
319 let machine: Result<UninitializedStateMachine<round::RoundStateMachine<T>>, _> =
321 serde_json::from_str(state_json);
322
323 match machine {
324 Ok(uninitialized) => {
325 let initialized = uninitialized.init_with_context(&mut ctx).await;
327 self.round_machines.push(initialized);
328 }
329 Err(e) => {
330 tracing::error!(
331 "Failed to deserialize round machine with operator index {:?}: {}",
332 operator_xonly_pk,
333 e
334 );
335 }
336 }
337 }
338
339 if !ctx.errors.is_empty() {
340 return Err(eyre::eyre!(
341 "Multiple errors occurred during state processing: {:?}",
342 ctx.errors
343 )
344 .into());
345 }
346
347 drop(ctx);
348
349 tracing::info!(
350 "Loaded {} kickoff machines and {} round machines from the database",
351 kickoff_machines.len(),
352 round_machines.len()
353 );
354
355 Arc::into_inner(init_dbtx)
356 .ok_or_eyre("Expected single reference to DB tx when committing")?
357 .into_inner()
358 .commit()
359 .await?;
360
361 Ok(())
362 }
363 #[cfg(test)]
364 #[doc(hidden)]
365 pub fn round_machines(&self) -> Vec<InitializedStateMachine<round::RoundStateMachine<T>>> {
366 self.round_machines.clone()
367 }
368
369 #[cfg(test)]
370 #[doc(hidden)]
371 pub fn kickoff_machines(
372 &self,
373 ) -> Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>> {
374 self.kickoff_machines.clone()
375 }
376
377 pub async fn save_state_to_db(
384 &mut self,
385 context: &mut context::StateContext<T>,
386 ) -> eyre::Result<()> {
387 let owner_type = T::ENTITY_NAME;
389
390 let kickoff_machines: eyre::Result<Vec<_>> = self
392 .kickoff_machines
393 .iter()
394 .filter(|machine| machine.dirty)
396 .map(|machine| -> eyre::Result<_> {
397 let state_json = serde_json::to_string(&machine).wrap_err_with(|| {
398 format!("Failed to serialize kickoff machine: {machine:?}")
399 })?;
400 let kickoff_id =
401 serde_json::to_string(&machine.kickoff_data).wrap_err_with(|| {
402 format!("Failed to serialize kickoff id for machine: {machine:?}")
403 })?;
404 Ok((state_json, (kickoff_id)))
405 })
406 .collect();
407
408 let round_machines: eyre::Result<Vec<_>> = self
410 .round_machines
411 .iter()
412 .filter(|machine| machine.dirty)
414 .map(|machine| -> eyre::Result<_> {
415 let state_json = serde_json::to_string(machine)
416 .wrap_err_with(|| format!("Failed to serialize round machine: {machine:?}"))?;
417 let operator_xonly_pk = machine.operator_data.xonly_pk;
418
419 Ok((state_json, (operator_xonly_pk)))
421 })
422 .collect();
423
424 {
425 let mut dbtx = context.shared_dbtx.lock().await;
426 self.db
428 .save_state_machines(
429 &mut dbtx,
430 kickoff_machines?,
431 round_machines?,
432 context.cache.block_height as i32 + 1,
433 owner_type,
434 )
435 .await?;
436 }
437
438 for machine in &mut self.kickoff_machines {
440 if machine.dirty {
441 machine
442 .handle_with_context(&KickoffEvent::SavedToDb, context)
443 .await;
444 }
445 }
446
447 for machine in &mut self.round_machines {
448 if machine.dirty {
449 machine
450 .handle_with_context(&RoundEvent::SavedToDb, context)
451 .await;
452 }
453 }
454
455 Ok(())
456 }
457
458 pub fn get_next_height_to_process(&self) -> u32 {
459 self.next_height_to_process
460 }
461
462 #[allow(clippy::type_complexity)]
478 fn update_machines<'a, M>(
479 machines: &mut Vec<InitializedStateMachine<M>>,
480 base_context: &'a context::StateContext<T>,
481 ) -> (
482 Vec<InitializedStateMachine<M>>,
483 Vec<
484 impl Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send + 'a,
485 >,
486 )
487 where
488 M: IntoStateMachine + Send + Sync + 'static,
489 M::State: Send + Sync + 'static,
490 InitializedStateMachine<M>: ContextProcessor<T, M>,
491 {
492 let mut unchanged_machines = Vec::new();
493 let mut processing_futures = Vec::new();
494
495 for machine in std::mem::take(machines).into_iter() {
496 match machine.process_with_ctx(base_context) {
497 ContextProcessResult::Processing(future) => processing_futures.push(future),
498 ContextProcessResult::Unchanged(machine) => unchanged_machines.push(machine),
499 }
500 }
501
502 (unchanged_machines, processing_futures)
503 }
504
505 pub async fn process_and_add_new_states_from_height(
508 &mut self,
509 dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
510 new_round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
511 new_kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
512 start_height: u32,
513 ) -> Result<(), eyre::Report> {
514 let mut temporary_manager = self.clone_without_machines();
516 temporary_manager.round_machines = new_round_machines;
517 temporary_manager.kickoff_machines = new_kickoff_machines;
518
519 for block_height in start_height..temporary_manager.next_height_to_process {
520 let block = temporary_manager
521 .get_block(Some(&mut *dbtx.lock().await), block_height)
522 .await
523 .wrap_err_with(|| {
524 format!(
525 "Block at height {block_height} not found in process_and_add_new_states_from_height"
526 )
527 })?;
528
529 let mut context = temporary_manager.new_context(dbtx.clone(), &block, block_height)?;
530 temporary_manager
531 .process_block_parallel(&mut context)
532 .await?;
533 }
534
535 self.round_machines.extend(temporary_manager.round_machines);
537 self.kickoff_machines
538 .extend(temporary_manager.kickoff_machines);
539
540 Ok(())
541 }
542
543 pub async fn process_block_parallel(
551 &mut self,
552 context: &mut context::StateContext<T>,
553 ) -> Result<(), eyre::Report> {
554 let block_height = context.cache.block_height;
555
556 let (mut final_kickoff_machines, mut kickoff_futures) =
559 Self::update_machines(&mut self.kickoff_machines, context);
560 let (mut final_round_machines, mut round_futures) =
561 Self::update_machines(&mut self.round_machines, context);
562
563 let mut iterations = 0;
567
568 while !kickoff_futures.is_empty() || !round_futures.is_empty() {
571 let (kickoff_results, round_results) =
573 join(join_all(kickoff_futures), join_all(round_futures)).await;
574
575 let (mut changed_kickoff_machines, mut kickoff_contexts): (Vec<_>, Vec<_>) =
577 kickoff_results.into_iter().unzip();
578 let (mut changed_round_machines, mut round_contexts): (Vec<_>, Vec<_>) =
579 round_results.into_iter().unzip();
580
581 let mut all_errors = Vec::new();
583 for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
584 all_errors.extend(std::mem::take(&mut ctx.errors));
585 }
586
587 if !all_errors.is_empty() {
588 return Err(eyre::eyre!(
590 "Multiple errors occurred during state processing: {:?}",
591 all_errors
592 ));
593 }
594
595 for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
597 #[cfg(debug_assertions)]
598 for machine in &ctx.new_round_machines {
599 if !machine.dirty {
600 panic!(
601 "Round machine not dirty despite having been newly created: {:?}",
602 machine.state()
603 );
604 }
605 }
606 #[cfg(debug_assertions)]
607 for machine in &ctx.new_kickoff_machines {
608 if !machine.dirty {
609 panic!(
610 "Kickoff machine not dirty despite having been newly created: {:?}",
611 machine.state()
612 );
613 }
614 }
615 changed_round_machines.extend(std::mem::take(&mut ctx.new_round_machines));
616 changed_kickoff_machines.extend(std::mem::take(&mut ctx.new_kickoff_machines));
617 }
618
619 if iterations > 100000 {
623 return Err(eyre::eyre!(
624 r#"{}/{} kickoff and {}/{} round state machines did not stabilize after 100000 iterations, debug repr of changed machines:
625 ---- Kickoff machines ----
626 {:?}
627 ---- Round machines ----
628 {:?}
629 "#,
630 changed_kickoff_machines.len(),
631 final_kickoff_machines.len() + changed_kickoff_machines.len(),
632 changed_round_machines.len(),
633 final_round_machines.len() + changed_round_machines.len(),
634 changed_kickoff_machines
635 .iter()
636 .map(|m| m.state())
637 .collect::<Vec<_>>(),
638 changed_round_machines
639 .iter()
640 .map(|m| m.state())
641 .collect::<Vec<_>>(),
642 ));
643 }
644
645 let (finalized_kickoff_machines, new_kickoff_futures) =
648 Self::update_machines(&mut changed_kickoff_machines, context);
649 let (finalized_round_machines, new_round_futures) =
650 Self::update_machines(&mut changed_round_machines, context);
651 final_kickoff_machines.extend(finalized_kickoff_machines);
652 final_round_machines.extend(finalized_round_machines);
653
654 kickoff_futures = new_kickoff_futures;
656 round_futures = new_round_futures;
657 iterations += 1;
658 }
659
660 drop(kickoff_futures);
661 drop(round_futures);
662
663 self.round_machines = final_round_machines;
665 self.kickoff_machines = final_kickoff_machines;
666
667 self.next_height_to_process = max(block_height + 1, self.next_height_to_process);
668
669 Ok(())
670 }
671}