1pub use crate::builder::block_cache;
22use crate::config::BridgeConfig;
23use crate::database::{Database, DatabaseTransaction};
24use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
25use crate::states::block_cache::BlockCache;
26use clementine_errors::BridgeError;
27use eyre::{Context, OptionExt};
28use futures::future::{join, join_all};
29use kickoff::KickoffEvent;
30use matcher::BlockMatcher;
31use pgmq::PGMQueueExt;
32use round::RoundEvent;
33use statig::awaitable::{InitializedStateMachine, UninitializedStateMachine};
34use statig::prelude::*;
35use std::cmp::max;
36use std::future::Future;
37use std::sync::Arc;
38use tokio::sync::Mutex;
39
40pub mod context;
41mod event;
42pub mod kickoff;
43mod matcher;
44pub mod round;
45pub mod task;
46
47pub use context::{Duty, Owner};
48pub use event::SystemEvent;
49
50pub use clementine_errors::StateMachineError;
51pub(crate) enum ContextProcessResult<
52 T: Owner,
53 M: IntoStateMachine,
54 Fut: Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send,
55> {
56 Unchanged(InitializedStateMachine<M>),
57 Processing(Fut),
58}
59
60pub(crate) trait ContextProcessor<T: Owner, M: IntoStateMachine> {
62 fn process_with_ctx(
66 self,
67 block: &context::StateContext<T>,
68 ) -> ContextProcessResult<
69 T,
70 M,
71 impl Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send,
72 >;
73}
74
75impl<T, M> ContextProcessor<T, M> for InitializedStateMachine<M>
77where
78 T: Owner,
79 for<'evt, 'ctx> M: IntoStateMachine<Event<'evt> = M::StateEvent, Context<'ctx> = context::StateContext<T>>
80 + Send
81 + BlockMatcher
82 + Clone,
83 M::State: awaitable::State<M> + 'static + Send,
84 for<'sub> M::Superstate<'sub>: awaitable::Superstate<M> + Send,
85 for<'evt> M::Event<'evt>: Send + Sync,
86{
87 fn process_with_ctx(
88 mut self,
89 block: &context::StateContext<T>,
90 ) -> ContextProcessResult<T, M, impl Future<Output = (Self, context::StateContext<T>)> + Send>
91 {
92 let events = self.match_block(&block.cache);
93 if events.is_empty() {
94 ContextProcessResult::Unchanged(self)
95 } else {
96 let mut ctx = block.clone();
97 ContextProcessResult::Processing(async move {
98 for event in events {
99 self.handle_with_context(&event, &mut ctx).await;
100 }
101 (self, ctx)
102 })
103 }
104 }
105}
106
107#[derive(Debug, Clone)]
113pub struct StateManager<T: Owner> {
114 pub db: Database,
115 queue: PGMQueueExt,
116 owner: T,
117 round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
118 kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
119 config: BridgeConfig,
120 next_height_to_process: u32,
121 last_processed_lcp: Option<u32>,
122 rpc: ExtendedBitcoinRpc,
123 last_finalized_block: Option<Arc<BlockCache>>,
125}
126
127impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
128 pub fn queue_name() -> String {
130 format!("{}_state_mgr_events", T::ENTITY_NAME)
131 }
132
133 pub fn clone_without_machines(&self) -> Self {
134 Self {
135 db: self.db.clone(),
136 queue: self.queue.clone(),
137 owner: self.owner.clone(),
138 round_machines: Vec::new(),
139 kickoff_machines: Vec::new(),
140 config: self.config.clone(),
141 next_height_to_process: self.next_height_to_process,
142 last_processed_lcp: self.last_processed_lcp,
143 rpc: self.rpc.clone(),
144 last_finalized_block: self.last_finalized_block.clone(),
145 }
146 }
147
148 pub fn new_context(
152 &self,
153 dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
154 block: &bitcoin::Block,
155 block_height: u32,
156 ) -> Result<context::StateContext<T>, BridgeError> {
157 Ok(context::StateContext::new(
158 dbtx,
159 Arc::new(self.owner.clone()),
160 Arc::new(BlockCache::from_block(block.clone(), block_height)),
161 self.config.clone(),
162 ))
163 }
164
165 pub fn new_context_with_block_cache(
166 &self,
167 dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
168 block_cache: Arc<BlockCache>,
169 ) -> Result<context::StateContext<T>, BridgeError> {
170 Ok(context::StateContext::new(
171 dbtx,
172 Arc::new(self.owner.clone()),
173 block_cache,
174 self.config.clone(),
175 ))
176 }
177
178 pub async fn new(
179 db: Database,
180 owner: T,
181 rpc: ExtendedBitcoinRpc,
182 config: BridgeConfig,
183 ) -> eyre::Result<Self> {
184 let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
185
186 queue.create(&Self::queue_name()).await.wrap_err_with(|| {
187 format!("Error creating pqmq queue with name {}", Self::queue_name())
188 })?;
189
190 let mut mgr = Self {
191 last_finalized_block: None,
192 db,
193 owner,
194 config: config.clone(),
195 round_machines: Vec::new(),
196 kickoff_machines: Vec::new(),
197 queue,
198 next_height_to_process: config.protocol_paramset.start_height,
199 last_processed_lcp: None,
200 rpc,
201 };
202
203 mgr.reload_state_manager_from_db().await?;
204 Ok(mgr)
205 }
206
207 async fn get_block(
208 &self,
209 dbtx: Option<DatabaseTransaction<'_>>,
210 height: u32,
211 ) -> Result<bitcoin::Block, BridgeError> {
212 match self.db.get_full_block(dbtx, height).await? {
213 Some(block) => Ok(block),
214 None => Ok(self.rpc.get_block_by_height(height.into()).await?),
215 }
216 }
217
218 pub async fn reload_state_manager_from_db(&mut self) -> Result<(), BridgeError> {
224 let mut dbtx = self.db.begin_transaction().await?;
225 let owner_type = T::ENTITY_NAME;
226 let status = self
227 .db
228 .get_next_height_to_process(Some(&mut dbtx), owner_type)
229 .await?;
230
231 let last_processed_lcp = self
233 .db
234 .get_last_processed_lcp(Some(&mut dbtx), owner_type)
235 .await?;
236
237 let loaded_last_processed_lcp = last_processed_lcp
238 .map(|lcp| u32::try_from(lcp).wrap_err("Last processed LCP doesn't fit into u32"))
239 .transpose()?;
240
241 let loaded_next_height_to_process = match status {
243 Some(block_height) => {
244 u32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?
245 }
246 None => {
247 tracing::info!("No state machines found in the database");
248 self.config.protocol_paramset.start_height
249 }
250 };
251
252 tracing::info!(
253 "Loading state machines from block height {}",
254 loaded_next_height_to_process.saturating_sub(1)
255 );
256
257 let last_height = loaded_next_height_to_process.saturating_sub(1);
258
259 let loaded_last_finalized_block = Arc::new(BlockCache::from_block(
260 match self.db.get_full_block(None, last_height).await? {
261 Some(block) => block,
262 None => self.rpc.get_block_by_height(last_height.into()).await?,
263 },
264 last_height,
265 ));
266
267 let kickoff_machines = self
269 .db
270 .load_kickoff_machines(Some(&mut dbtx), owner_type)
271 .await?;
272
273 let round_machines = self
275 .db
276 .load_round_machines(Some(&mut dbtx), owner_type)
277 .await?;
278
279 let init_dbtx = Arc::new(Mutex::new(dbtx));
280
281 let mut ctx = self
282 .new_context_with_block_cache(init_dbtx.clone(), loaded_last_finalized_block.clone())?;
283
284 let mut loaded_kickoff_machines = Vec::with_capacity(kickoff_machines.len());
286 for (state_json, kickoff_id, saved_block_height) in &kickoff_machines {
287 tracing::debug!(
288 "Loaded kickoff machine: state={}, block_height={}",
289 state_json,
290 saved_block_height
291 );
292
293 let machine: Result<UninitializedStateMachine<kickoff::KickoffStateMachine<T>>, _> =
295 serde_json::from_str(state_json);
296
297 match machine {
298 Ok(uninitialized) => {
299 let initialized = uninitialized.init_with_context(&mut ctx).await;
301 loaded_kickoff_machines.push(initialized);
302 }
303 Err(e) => {
304 return Err(eyre::eyre!(
305 "Failed to deserialize kickoff machine with ID {}: {}",
306 kickoff_id,
307 e
308 )
309 .into());
310 }
311 }
312 }
313
314 let mut loaded_round_machines = Vec::with_capacity(round_machines.len());
316 for (state_json, operator_xonly_pk, saved_block_height) in &round_machines {
317 tracing::debug!(
318 "Loaded round machine: state={}, block_height={}",
319 state_json,
320 saved_block_height
321 );
322
323 let machine: Result<UninitializedStateMachine<round::RoundStateMachine<T>>, _> =
325 serde_json::from_str(state_json);
326
327 match machine {
328 Ok(uninitialized) => {
329 let initialized = uninitialized.init_with_context(&mut ctx).await;
331 loaded_round_machines.push(initialized);
332 }
333 Err(e) => {
334 return Err(eyre::eyre!(
335 "Failed to deserialize round machine with operator index {:?}: {}",
336 operator_xonly_pk,
337 e
338 )
339 .into());
340 }
341 }
342 }
343
344 if !ctx.errors.is_empty() {
345 return Err(eyre::eyre!(
346 "Multiple errors occurred during state processing: {:?}",
347 ctx.errors
348 )
349 .into());
350 }
351
352 drop(ctx);
353
354 tracing::info!(
355 "Loaded {} kickoff machines and {} round machines from the database",
356 loaded_kickoff_machines.len(),
357 loaded_round_machines.len()
358 );
359
360 Arc::into_inner(init_dbtx)
361 .ok_or_eyre("Expected single reference to DB tx when committing")?
362 .into_inner()
363 .commit()
364 .await?;
365
366 self.last_processed_lcp = loaded_last_processed_lcp;
367 self.next_height_to_process = loaded_next_height_to_process;
368 self.last_finalized_block = Some(loaded_last_finalized_block);
369 self.kickoff_machines = loaded_kickoff_machines;
370 self.round_machines = loaded_round_machines;
371
372 Ok(())
373 }
374 #[cfg(test)]
375 #[doc(hidden)]
376 pub fn round_machines(&self) -> Vec<InitializedStateMachine<round::RoundStateMachine<T>>> {
377 self.round_machines.clone()
378 }
379
380 #[cfg(test)]
381 #[doc(hidden)]
382 pub fn kickoff_machines(
383 &self,
384 ) -> Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>> {
385 self.kickoff_machines.clone()
386 }
387
388 pub async fn save_state_to_db(
395 &mut self,
396 context: &mut context::StateContext<T>,
397 ) -> eyre::Result<()> {
398 let owner_type = T::ENTITY_NAME;
400
401 let kickoff_machines: eyre::Result<Vec<_>> = self
403 .kickoff_machines
404 .iter()
405 .filter(|machine| machine.dirty)
407 .map(|machine| -> eyre::Result<_> {
408 let state_json = serde_json::to_string(&machine).wrap_err_with(|| {
409 format!("Failed to serialize kickoff machine: {machine:?}")
410 })?;
411 let kickoff_id =
412 serde_json::to_string(&machine.kickoff_data).wrap_err_with(|| {
413 format!("Failed to serialize kickoff id for machine: {machine:?}")
414 })?;
415 Ok((state_json, (kickoff_id)))
416 })
417 .collect();
418
419 let round_machines: eyre::Result<Vec<_>> = self
421 .round_machines
422 .iter()
423 .filter(|machine| machine.dirty)
425 .map(|machine| -> eyre::Result<_> {
426 let state_json = serde_json::to_string(machine)
427 .wrap_err_with(|| format!("Failed to serialize round machine: {machine:?}"))?;
428 let operator_xonly_pk = machine.operator_data.xonly_pk;
429
430 Ok((state_json, (operator_xonly_pk)))
432 })
433 .collect();
434
435 {
436 let mut dbtx = context.shared_dbtx.lock().await;
437
438 let last_processed_lcp_i32 = self
439 .last_processed_lcp
440 .map(i32::try_from)
441 .transpose()
442 .wrap_err("Failed to convert last_processed_lcp to i32")?;
443
444 let block_height_i32 = i32::try_from(context.cache.block_height)
445 .wrap_err("Failed to convert block_height to i32")?;
446
447 self.db
449 .save_state_machines(
450 &mut dbtx,
451 kickoff_machines?,
452 round_machines?,
453 block_height_i32 + 1,
454 owner_type,
455 last_processed_lcp_i32,
456 )
457 .await?;
458 }
459
460 for machine in &mut self.kickoff_machines {
462 if machine.dirty {
463 machine
464 .handle_with_context(&KickoffEvent::SavedToDb, context)
465 .await;
466 }
467 }
468
469 for machine in &mut self.round_machines {
470 if machine.dirty {
471 machine
472 .handle_with_context(&RoundEvent::SavedToDb, context)
473 .await;
474 }
475 }
476
477 Ok(())
478 }
479
480 pub fn get_next_height_to_process(&self) -> u32 {
481 self.next_height_to_process
482 }
483
484 #[allow(clippy::type_complexity)]
500 fn update_machines<'a, M>(
501 machines: &mut Vec<InitializedStateMachine<M>>,
502 base_context: &'a context::StateContext<T>,
503 ) -> (
504 Vec<InitializedStateMachine<M>>,
505 Vec<
506 impl Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send + 'a,
507 >,
508 )
509 where
510 M: IntoStateMachine + Send + Sync + 'static,
511 M::State: Send + Sync + 'static,
512 InitializedStateMachine<M>: ContextProcessor<T, M>,
513 {
514 let mut unchanged_machines = Vec::new();
515 let mut processing_futures = Vec::new();
516
517 for machine in std::mem::take(machines).into_iter() {
518 match machine.process_with_ctx(base_context) {
519 ContextProcessResult::Processing(future) => processing_futures.push(future),
520 ContextProcessResult::Unchanged(machine) => unchanged_machines.push(machine),
521 }
522 }
523
524 (unchanged_machines, processing_futures)
525 }
526
527 pub async fn process_and_add_new_states_from_height(
530 &mut self,
531 dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
532 new_round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
533 new_kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
534 start_height: u32,
535 ) -> Result<(), eyre::Report> {
536 let mut temporary_manager = self.clone_without_machines();
538 temporary_manager.round_machines = new_round_machines;
539 temporary_manager.kickoff_machines = new_kickoff_machines;
540
541 for block_height in start_height..temporary_manager.next_height_to_process {
542 let block = temporary_manager
543 .get_block(Some(&mut *dbtx.lock().await), block_height)
544 .await
545 .wrap_err_with(|| {
546 format!(
547 "Block at height {block_height} not found in process_and_add_new_states_from_height"
548 )
549 })?;
550
551 let mut context = temporary_manager.new_context(dbtx.clone(), &block, block_height)?;
552 temporary_manager
553 .process_block_parallel(&mut context)
554 .await?;
555 }
556
557 self.round_machines.extend(temporary_manager.round_machines);
559 self.kickoff_machines
560 .extend(temporary_manager.kickoff_machines);
561
562 Ok(())
563 }
564
565 pub async fn process_block_parallel(
573 &mut self,
574 context: &mut context::StateContext<T>,
575 ) -> Result<(), eyre::Report> {
576 let block_height = context.cache.block_height;
577
578 let (mut final_kickoff_machines, mut kickoff_futures) =
581 Self::update_machines(&mut self.kickoff_machines, context);
582 let (mut final_round_machines, mut round_futures) =
583 Self::update_machines(&mut self.round_machines, context);
584
585 let mut iterations = 0;
589
590 while !kickoff_futures.is_empty() || !round_futures.is_empty() {
593 let (kickoff_results, round_results) =
595 join(join_all(kickoff_futures), join_all(round_futures)).await;
596
597 let (mut changed_kickoff_machines, mut kickoff_contexts): (Vec<_>, Vec<_>) =
599 kickoff_results.into_iter().unzip();
600 let (mut changed_round_machines, mut round_contexts): (Vec<_>, Vec<_>) =
601 round_results.into_iter().unzip();
602
603 let mut all_errors = Vec::new();
605 for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
606 all_errors.extend(std::mem::take(&mut ctx.errors));
607 }
608
609 if !all_errors.is_empty() {
610 return Err(eyre::eyre!(
612 "Multiple errors occurred during state processing: {:?}",
613 all_errors
614 ));
615 }
616
617 for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
619 #[cfg(debug_assertions)]
620 for machine in &ctx.new_round_machines {
621 if !machine.dirty {
622 panic!(
623 "Round machine not dirty despite having been newly created: {:?}",
624 machine.state()
625 );
626 }
627 }
628 #[cfg(debug_assertions)]
629 for machine in &ctx.new_kickoff_machines {
630 if !machine.dirty {
631 panic!(
632 "Kickoff machine not dirty despite having been newly created: {:?}",
633 machine.state()
634 );
635 }
636 }
637 changed_round_machines.extend(std::mem::take(&mut ctx.new_round_machines));
638 changed_kickoff_machines.extend(std::mem::take(&mut ctx.new_kickoff_machines));
639 }
640
641 if iterations > 100000 {
645 return Err(eyre::eyre!(
646 r#"{}/{} kickoff and {}/{} round state machines did not stabilize after 100000 iterations, debug repr of changed machines:
647 ---- Kickoff machines ----
648 {:?}
649 ---- Round machines ----
650 {:?}
651 "#,
652 changed_kickoff_machines.len(),
653 final_kickoff_machines.len() + changed_kickoff_machines.len(),
654 changed_round_machines.len(),
655 final_round_machines.len() + changed_round_machines.len(),
656 changed_kickoff_machines
657 .iter()
658 .map(|m| m.state())
659 .collect::<Vec<_>>(),
660 changed_round_machines
661 .iter()
662 .map(|m| m.state())
663 .collect::<Vec<_>>(),
664 ));
665 }
666
667 let (finalized_kickoff_machines, new_kickoff_futures) =
670 Self::update_machines(&mut changed_kickoff_machines, context);
671 let (finalized_round_machines, new_round_futures) =
672 Self::update_machines(&mut changed_round_machines, context);
673 final_kickoff_machines.extend(finalized_kickoff_machines);
674 final_round_machines.extend(finalized_round_machines);
675
676 kickoff_futures = new_kickoff_futures;
678 round_futures = new_round_futures;
679 iterations += 1;
680 }
681
682 drop(kickoff_futures);
683 drop(round_futures);
684
685 self.round_machines = final_round_machines;
687 self.kickoff_machines = final_kickoff_machines;
688
689 self.next_height_to_process = max(block_height + 1, self.next_height_to_process);
690
691 Ok(())
692 }
693}