1pub use crate::builder::block_cache;
22use crate::config::BridgeConfig;
23use crate::database::{Database, DatabaseTransaction};
24use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
25use crate::states::block_cache::BlockCache;
26use clementine_errors::BridgeError;
27use eyre::{Context, OptionExt};
28use futures::future::{join, join_all};
29use kickoff::KickoffEvent;
30use matcher::BlockMatcher;
31use pgmq::PGMQueueExt;
32use round::RoundEvent;
33use statig::awaitable::{InitializedStateMachine, UninitializedStateMachine};
34use statig::prelude::*;
35use std::cmp::max;
36use std::future::Future;
37use std::sync::Arc;
38use tokio::sync::Mutex;
39
40pub mod context;
41mod event;
42pub mod kickoff;
43mod matcher;
44pub mod round;
45pub mod task;
46
47pub use context::{Duty, Owner};
48pub use event::SystemEvent;
49
50pub use clementine_errors::StateMachineError;
51pub(crate) enum ContextProcessResult<
52 T: Owner,
53 M: IntoStateMachine,
54 Fut: Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send,
55> {
56 Unchanged(InitializedStateMachine<M>),
57 Processing(Fut),
58}
59
60pub(crate) trait ContextProcessor<T: Owner, M: IntoStateMachine> {
62 fn process_with_ctx(
66 self,
67 block: &context::StateContext<T>,
68 ) -> ContextProcessResult<
69 T,
70 M,
71 impl Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send,
72 >;
73}
74
75impl<T, M> ContextProcessor<T, M> for InitializedStateMachine<M>
77where
78 T: Owner,
79 for<'evt, 'ctx> M: IntoStateMachine<Event<'evt> = M::StateEvent, Context<'ctx> = context::StateContext<T>>
80 + Send
81 + BlockMatcher
82 + Clone,
83 M::State: awaitable::State<M> + 'static + Send,
84 for<'sub> M::Superstate<'sub>: awaitable::Superstate<M> + Send,
85 for<'evt> M::Event<'evt>: Send + Sync,
86{
87 fn process_with_ctx(
88 mut self,
89 block: &context::StateContext<T>,
90 ) -> ContextProcessResult<T, M, impl Future<Output = (Self, context::StateContext<T>)> + Send>
91 {
92 let events = self.match_block(&block.cache);
93 if events.is_empty() {
94 ContextProcessResult::Unchanged(self)
95 } else {
96 let mut ctx = block.clone();
97 ContextProcessResult::Processing(async move {
98 for event in events {
99 self.handle_with_context(&event, &mut ctx).await;
100 }
101 (self, ctx)
102 })
103 }
104 }
105}
106
107#[derive(Debug, Clone)]
113pub struct StateManager<T: Owner> {
114 pub db: Database,
115 queue: PGMQueueExt,
116 owner: T,
117 round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
118 kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
119 config: BridgeConfig,
120 next_height_to_process: u32,
121 rpc: ExtendedBitcoinRpc,
122 last_finalized_block: Option<Arc<BlockCache>>,
124}
125
126impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
127 pub fn queue_name() -> String {
129 format!("{}_state_mgr_events", T::ENTITY_NAME)
130 }
131
132 pub fn clone_without_machines(&self) -> Self {
133 Self {
134 db: self.db.clone(),
135 queue: self.queue.clone(),
136 owner: self.owner.clone(),
137 round_machines: Vec::new(),
138 kickoff_machines: Vec::new(),
139 config: self.config.clone(),
140 next_height_to_process: self.next_height_to_process,
141 rpc: self.rpc.clone(),
142 last_finalized_block: self.last_finalized_block.clone(),
143 }
144 }
145
146 pub fn new_context(
150 &self,
151 dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
152 block: &bitcoin::Block,
153 block_height: u32,
154 ) -> Result<context::StateContext<T>, BridgeError> {
155 Ok(context::StateContext::new(
156 dbtx,
157 Arc::new(self.owner.clone()),
158 Arc::new(BlockCache::from_block(block.clone(), block_height)),
159 self.config.clone(),
160 ))
161 }
162
163 pub fn new_context_with_block_cache(
164 &self,
165 dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
166 block_cache: Arc<BlockCache>,
167 ) -> Result<context::StateContext<T>, BridgeError> {
168 Ok(context::StateContext::new(
169 dbtx,
170 Arc::new(self.owner.clone()),
171 block_cache,
172 self.config.clone(),
173 ))
174 }
175
176 pub async fn new(
177 db: Database,
178 owner: T,
179 rpc: ExtendedBitcoinRpc,
180 config: BridgeConfig,
181 ) -> eyre::Result<Self> {
182 let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
183
184 queue.create(&Self::queue_name()).await.wrap_err_with(|| {
185 format!("Error creating pqmq queue with name {}", Self::queue_name())
186 })?;
187
188 let mut mgr = Self {
189 last_finalized_block: None,
190 db,
191 owner,
192 config: config.clone(),
193 round_machines: Vec::new(),
194 kickoff_machines: Vec::new(),
195 queue,
196 next_height_to_process: config.protocol_paramset.start_height,
197 rpc,
198 };
199
200 mgr.reload_state_manager_from_db().await?;
201 Ok(mgr)
202 }
203
204 async fn get_block(
205 &self,
206 dbtx: Option<DatabaseTransaction<'_>>,
207 height: u32,
208 ) -> Result<bitcoin::Block, BridgeError> {
209 match self.db.get_full_block(dbtx, height).await? {
210 Some(block) => Ok(block),
211 None => Ok(self.rpc.get_block_by_height(height.into()).await?),
212 }
213 }
214
215 pub async fn reload_state_manager_from_db(&mut self) -> Result<(), BridgeError> {
221 let mut dbtx = self.db.begin_transaction().await?;
222 let owner_type = T::ENTITY_NAME;
223 let status = self
224 .db
225 .get_next_height_to_process(Some(&mut dbtx), owner_type)
226 .await?;
227
228 self.next_height_to_process = match status {
230 Some(block_height) => {
231 u32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?
232 }
233 None => {
234 tracing::info!("No state machines found in the database");
235 self.config.protocol_paramset.start_height
236 }
237 };
238
239 tracing::info!(
240 "Loading state machines from block height {}",
241 self.next_height_to_process.saturating_sub(1)
242 );
243
244 let last_height = self.next_height_to_process.saturating_sub(1);
245
246 self.last_finalized_block = Some(Arc::new(BlockCache::from_block(
247 match self.db.get_full_block(None, last_height).await? {
248 Some(block) => block,
249 None => self.rpc.get_block_by_height(last_height.into()).await?,
250 },
251 last_height,
252 )));
253
254 let kickoff_machines = self
256 .db
257 .load_kickoff_machines(Some(&mut dbtx), owner_type)
258 .await?;
259
260 let round_machines = self
262 .db
263 .load_round_machines(Some(&mut dbtx), owner_type)
264 .await?;
265
266 let init_dbtx = Arc::new(Mutex::new(dbtx));
267
268 let mut ctx = self.new_context_with_block_cache(
269 init_dbtx.clone(),
270 self.last_finalized_block
271 .clone()
272 .expect("Initialized before"),
273 )?;
274
275 for (state_json, kickoff_id, saved_block_height) in &kickoff_machines {
277 tracing::debug!(
278 "Loaded kickoff machine: state={}, block_height={}",
279 state_json,
280 saved_block_height
281 );
282
283 let machine: Result<UninitializedStateMachine<kickoff::KickoffStateMachine<T>>, _> =
285 serde_json::from_str(state_json);
286
287 match machine {
288 Ok(uninitialized) => {
289 let initialized = uninitialized.init_with_context(&mut ctx).await;
291 self.kickoff_machines.push(initialized);
292 }
293 Err(e) => {
294 tracing::warn!(
295 "Failed to deserialize kickoff machine with ID {}: {}",
296 kickoff_id,
297 e
298 );
299 }
300 }
301 }
302
303 for (state_json, operator_xonly_pk, saved_block_height) in &round_machines {
305 tracing::debug!(
306 "Loaded round machine: state={}, block_height={}",
307 state_json,
308 saved_block_height
309 );
310
311 let machine: Result<UninitializedStateMachine<round::RoundStateMachine<T>>, _> =
313 serde_json::from_str(state_json);
314
315 match machine {
316 Ok(uninitialized) => {
317 let initialized = uninitialized.init_with_context(&mut ctx).await;
319 self.round_machines.push(initialized);
320 }
321 Err(e) => {
322 tracing::error!(
323 "Failed to deserialize round machine with operator index {:?}: {}",
324 operator_xonly_pk,
325 e
326 );
327 }
328 }
329 }
330
331 if !ctx.errors.is_empty() {
332 return Err(eyre::eyre!(
333 "Multiple errors occurred during state processing: {:?}",
334 ctx.errors
335 )
336 .into());
337 }
338
339 drop(ctx);
340
341 tracing::info!(
342 "Loaded {} kickoff machines and {} round machines from the database",
343 kickoff_machines.len(),
344 round_machines.len()
345 );
346
347 Arc::into_inner(init_dbtx)
348 .ok_or_eyre("Expected single reference to DB tx when committing")?
349 .into_inner()
350 .commit()
351 .await?;
352
353 Ok(())
354 }
355 #[cfg(test)]
356 #[doc(hidden)]
357 pub fn round_machines(&self) -> Vec<InitializedStateMachine<round::RoundStateMachine<T>>> {
358 self.round_machines.clone()
359 }
360
361 #[cfg(test)]
362 #[doc(hidden)]
363 pub fn kickoff_machines(
364 &self,
365 ) -> Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>> {
366 self.kickoff_machines.clone()
367 }
368
369 pub async fn save_state_to_db(
376 &mut self,
377 context: &mut context::StateContext<T>,
378 ) -> eyre::Result<()> {
379 let owner_type = T::ENTITY_NAME;
381
382 let kickoff_machines: eyre::Result<Vec<_>> = self
384 .kickoff_machines
385 .iter()
386 .filter(|machine| machine.dirty)
388 .map(|machine| -> eyre::Result<_> {
389 let state_json = serde_json::to_string(&machine).wrap_err_with(|| {
390 format!("Failed to serialize kickoff machine: {machine:?}")
391 })?;
392 let kickoff_id =
393 serde_json::to_string(&machine.kickoff_data).wrap_err_with(|| {
394 format!("Failed to serialize kickoff id for machine: {machine:?}")
395 })?;
396 Ok((state_json, (kickoff_id)))
397 })
398 .collect();
399
400 let round_machines: eyre::Result<Vec<_>> = self
402 .round_machines
403 .iter()
404 .filter(|machine| machine.dirty)
406 .map(|machine| -> eyre::Result<_> {
407 let state_json = serde_json::to_string(machine)
408 .wrap_err_with(|| format!("Failed to serialize round machine: {machine:?}"))?;
409 let operator_xonly_pk = machine.operator_data.xonly_pk;
410
411 Ok((state_json, (operator_xonly_pk)))
413 })
414 .collect();
415
416 {
417 let mut dbtx = context.shared_dbtx.lock().await;
418 self.db
420 .save_state_machines(
421 &mut dbtx,
422 kickoff_machines?,
423 round_machines?,
424 context.cache.block_height as i32 + 1,
425 owner_type,
426 )
427 .await?;
428 }
429
430 for machine in &mut self.kickoff_machines {
432 if machine.dirty {
433 machine
434 .handle_with_context(&KickoffEvent::SavedToDb, context)
435 .await;
436 }
437 }
438
439 for machine in &mut self.round_machines {
440 if machine.dirty {
441 machine
442 .handle_with_context(&RoundEvent::SavedToDb, context)
443 .await;
444 }
445 }
446
447 Ok(())
448 }
449
450 pub fn get_next_height_to_process(&self) -> u32 {
451 self.next_height_to_process
452 }
453
454 #[allow(clippy::type_complexity)]
470 fn update_machines<'a, M>(
471 machines: &mut Vec<InitializedStateMachine<M>>,
472 base_context: &'a context::StateContext<T>,
473 ) -> (
474 Vec<InitializedStateMachine<M>>,
475 Vec<
476 impl Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send + 'a,
477 >,
478 )
479 where
480 M: IntoStateMachine + Send + Sync + 'static,
481 M::State: Send + Sync + 'static,
482 InitializedStateMachine<M>: ContextProcessor<T, M>,
483 {
484 let mut unchanged_machines = Vec::new();
485 let mut processing_futures = Vec::new();
486
487 for machine in std::mem::take(machines).into_iter() {
488 match machine.process_with_ctx(base_context) {
489 ContextProcessResult::Processing(future) => processing_futures.push(future),
490 ContextProcessResult::Unchanged(machine) => unchanged_machines.push(machine),
491 }
492 }
493
494 (unchanged_machines, processing_futures)
495 }
496
497 pub async fn process_and_add_new_states_from_height(
500 &mut self,
501 dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
502 new_round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
503 new_kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
504 start_height: u32,
505 ) -> Result<(), eyre::Report> {
506 let mut temporary_manager = self.clone_without_machines();
508 temporary_manager.round_machines = new_round_machines;
509 temporary_manager.kickoff_machines = new_kickoff_machines;
510
511 for block_height in start_height..temporary_manager.next_height_to_process {
512 let block = temporary_manager
513 .get_block(Some(&mut *dbtx.lock().await), block_height)
514 .await
515 .wrap_err_with(|| {
516 format!(
517 "Block at height {block_height} not found in process_and_add_new_states_from_height"
518 )
519 })?;
520
521 let mut context = temporary_manager.new_context(dbtx.clone(), &block, block_height)?;
522 temporary_manager
523 .process_block_parallel(&mut context)
524 .await?;
525 }
526
527 self.round_machines.extend(temporary_manager.round_machines);
529 self.kickoff_machines
530 .extend(temporary_manager.kickoff_machines);
531
532 Ok(())
533 }
534
535 pub async fn process_block_parallel(
543 &mut self,
544 context: &mut context::StateContext<T>,
545 ) -> Result<(), eyre::Report> {
546 let block_height = context.cache.block_height;
547
548 let (mut final_kickoff_machines, mut kickoff_futures) =
551 Self::update_machines(&mut self.kickoff_machines, context);
552 let (mut final_round_machines, mut round_futures) =
553 Self::update_machines(&mut self.round_machines, context);
554
555 let mut iterations = 0;
559
560 while !kickoff_futures.is_empty() || !round_futures.is_empty() {
563 let (kickoff_results, round_results) =
565 join(join_all(kickoff_futures), join_all(round_futures)).await;
566
567 let (mut changed_kickoff_machines, mut kickoff_contexts): (Vec<_>, Vec<_>) =
569 kickoff_results.into_iter().unzip();
570 let (mut changed_round_machines, mut round_contexts): (Vec<_>, Vec<_>) =
571 round_results.into_iter().unzip();
572
573 let mut all_errors = Vec::new();
575 for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
576 all_errors.extend(std::mem::take(&mut ctx.errors));
577 }
578
579 if !all_errors.is_empty() {
580 return Err(eyre::eyre!(
582 "Multiple errors occurred during state processing: {:?}",
583 all_errors
584 ));
585 }
586
587 for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
589 #[cfg(debug_assertions)]
590 for machine in &ctx.new_round_machines {
591 if !machine.dirty {
592 panic!(
593 "Round machine not dirty despite having been newly created: {:?}",
594 machine.state()
595 );
596 }
597 }
598 #[cfg(debug_assertions)]
599 for machine in &ctx.new_kickoff_machines {
600 if !machine.dirty {
601 panic!(
602 "Kickoff machine not dirty despite having been newly created: {:?}",
603 machine.state()
604 );
605 }
606 }
607 changed_round_machines.extend(std::mem::take(&mut ctx.new_round_machines));
608 changed_kickoff_machines.extend(std::mem::take(&mut ctx.new_kickoff_machines));
609 }
610
611 if iterations > 100000 {
615 return Err(eyre::eyre!(
616 r#"{}/{} kickoff and {}/{} round state machines did not stabilize after 100000 iterations, debug repr of changed machines:
617 ---- Kickoff machines ----
618 {:?}
619 ---- Round machines ----
620 {:?}
621 "#,
622 changed_kickoff_machines.len(),
623 final_kickoff_machines.len() + changed_kickoff_machines.len(),
624 changed_round_machines.len(),
625 final_round_machines.len() + changed_round_machines.len(),
626 changed_kickoff_machines
627 .iter()
628 .map(|m| m.state())
629 .collect::<Vec<_>>(),
630 changed_round_machines
631 .iter()
632 .map(|m| m.state())
633 .collect::<Vec<_>>(),
634 ));
635 }
636
637 let (finalized_kickoff_machines, new_kickoff_futures) =
640 Self::update_machines(&mut changed_kickoff_machines, context);
641 let (finalized_round_machines, new_round_futures) =
642 Self::update_machines(&mut changed_round_machines, context);
643 final_kickoff_machines.extend(finalized_kickoff_machines);
644 final_round_machines.extend(finalized_round_machines);
645
646 kickoff_futures = new_kickoff_futures;
648 round_futures = new_round_futures;
649 iterations += 1;
650 }
651
652 drop(kickoff_futures);
653 drop(round_futures);
654
655 self.round_machines = final_round_machines;
657 self.kickoff_machines = final_kickoff_machines;
658
659 self.next_height_to_process = max(block_height + 1, self.next_height_to_process);
660
661 Ok(())
662 }
663}