clementine_core/states/
mod.rs

1//! State manager module
2//!
3//! This module contains the state manager, which is responsible for holding the state machines
4//! of the system representing the current operator state for each operator, and kickoff state for each kickoff
5//! (Each operator and kickoff process has its own state machine).
6//!
7//! The main responsibility of the state manager is to process each finalized block in Bitcoin and update the state machines.
8//! The blocks are scanned for relevant Clementine tx's and internal state of the state machines is updated. This relevant data
9//! is passed on verifiers/operators when its time to send bridge transactions (like operator asserts, watchtower challenges, etc).
10//!
11//! Operator state machine: Stores where in the collateral chain the operator is. (Which round or ready to reimburse tx)
12//!     Additionally during rounds it stores which kickoff utxos of the round are spent.
13//! Kickoff state machine:
14//!     - Stores if the kickoff was challenged.
15//!     - Stores/tracks watchtower challenges, latest blockhash commit by operator, operator asserts, any relevant information needed
16//!           for proving and disproving the kickoff.
17//!
18//! For state machines we use the [statig](https://github.com/mdeloof/statig) crate.
19//!
20
21pub use crate::builder::block_cache;
22use crate::config::BridgeConfig;
23use crate::database::{Database, DatabaseTransaction};
24use crate::errors::BridgeError;
25use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
26use crate::states::block_cache::BlockCache;
27use eyre::{Context, OptionExt};
28use futures::future::{join, join_all};
29use kickoff::KickoffEvent;
30use matcher::BlockMatcher;
31use pgmq::PGMQueueExt;
32use round::RoundEvent;
33use statig::awaitable::{InitializedStateMachine, UninitializedStateMachine};
34use statig::prelude::*;
35use std::cmp::max;
36use std::future::Future;
37use std::sync::Arc;
38use thiserror::Error;
39use tokio::sync::Mutex;
40
41pub mod context;
42mod event;
43pub mod kickoff;
44mod matcher;
45pub mod round;
46pub mod task;
47
48pub use context::{Duty, Owner};
49pub use event::SystemEvent;
50
51#[derive(Debug, Error)]
52pub enum StateMachineError {
53    #[error("State machine received event that it doesn't know how to handle: {0}")]
54    UnhandledEvent(String),
55
56    #[error(transparent)]
57    Other(#[from] eyre::Report),
58}
59pub(crate) enum ContextProcessResult<
60    T: Owner,
61    M: IntoStateMachine,
62    Fut: Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send,
63> {
64    Unchanged(InitializedStateMachine<M>),
65    Processing(Fut),
66}
67
68/// Utility trait to make processing generic
69pub(crate) trait ContextProcessor<T: Owner, M: IntoStateMachine> {
70    /// Processes the machine with the given state context (which contains the block cache)
71    /// If the machine is unchanged, it is returned as is. Otherwise, the machine is processed
72    /// and the result is returned as a future that processes the new events.
73    fn process_with_ctx(
74        self,
75        block: &context::StateContext<T>,
76    ) -> ContextProcessResult<
77        T,
78        M,
79        impl Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send,
80    >;
81}
82
83/// Generic implementation for all state machines
84impl<T, M> ContextProcessor<T, M> for InitializedStateMachine<M>
85where
86    T: Owner,
87    for<'evt, 'ctx> M: IntoStateMachine<Event<'evt> = M::StateEvent, Context<'ctx> = context::StateContext<T>>
88        + Send
89        + BlockMatcher
90        + Clone,
91    M::State: awaitable::State<M> + 'static + Send,
92    for<'sub> M::Superstate<'sub>: awaitable::Superstate<M> + Send,
93    for<'evt> M::Event<'evt>: Send + Sync,
94{
95    fn process_with_ctx(
96        mut self,
97        block: &context::StateContext<T>,
98    ) -> ContextProcessResult<T, M, impl Future<Output = (Self, context::StateContext<T>)> + Send>
99    {
100        let events = self.match_block(&block.cache);
101        if events.is_empty() {
102            ContextProcessResult::Unchanged(self)
103        } else {
104            let mut ctx = block.clone();
105            ContextProcessResult::Processing(async move {
106                for event in events {
107                    self.handle_with_context(&event, &mut ctx).await;
108                }
109                (self, ctx)
110            })
111        }
112    }
113}
114
115/// State manager stores the state machines.
116/// It is responsible for following:
117///     - Persisting current state of the state machines to the database.
118///     - Collecting new [`SystemEvent`]s from the message queue and passing them to the state machines,
119///           thus updating the state machines.
120#[derive(Debug, Clone)]
121pub struct StateManager<T: Owner> {
122    pub db: Database,
123    queue: PGMQueueExt,
124    owner: T,
125    round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
126    kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
127    config: BridgeConfig,
128    next_height_to_process: u32,
129    rpc: ExtendedBitcoinRpc,
130    // Set on the first finalized block event or the load_from_db method
131    last_finalized_block: Option<Arc<BlockCache>>,
132}
133
134impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
135    /// Returns message queue name for the state manager.
136    pub fn queue_name() -> String {
137        format!("{}_state_mgr_events", T::ENTITY_NAME)
138    }
139
140    pub fn clone_without_machines(&self) -> Self {
141        Self {
142            db: self.db.clone(),
143            queue: self.queue.clone(),
144            owner: self.owner.clone(),
145            round_machines: Vec::new(),
146            kickoff_machines: Vec::new(),
147            config: self.config.clone(),
148            next_height_to_process: self.next_height_to_process,
149            rpc: self.rpc.clone(),
150            last_finalized_block: self.last_finalized_block.clone(),
151        }
152    }
153
154    /// Warning: This is costly due to the calculation of the block_cache, use a
155    /// pre-existing `block_cache` with the `new_context_with_block_cache`
156    /// method if possible.
157    pub fn new_context(
158        &self,
159        dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
160        block: &bitcoin::Block,
161        block_height: u32,
162    ) -> Result<context::StateContext<T>, BridgeError> {
163        Ok(context::StateContext::new(
164            dbtx,
165            Arc::new(self.owner.clone()),
166            Arc::new(BlockCache::from_block(block.clone(), block_height)),
167            self.config.clone(),
168        ))
169    }
170
171    pub fn new_context_with_block_cache(
172        &self,
173        dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
174        block_cache: Arc<BlockCache>,
175    ) -> Result<context::StateContext<T>, BridgeError> {
176        Ok(context::StateContext::new(
177            dbtx,
178            Arc::new(self.owner.clone()),
179            block_cache,
180            self.config.clone(),
181        ))
182    }
183
184    pub async fn new(
185        db: Database,
186        owner: T,
187        rpc: ExtendedBitcoinRpc,
188        config: BridgeConfig,
189    ) -> eyre::Result<Self> {
190        let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
191
192        queue.create(&Self::queue_name()).await.wrap_err_with(|| {
193            format!("Error creating pqmq queue with name {}", Self::queue_name())
194        })?;
195
196        let mut mgr = Self {
197            last_finalized_block: None,
198            db,
199            owner,
200            config: config.clone(),
201            round_machines: Vec::new(),
202            kickoff_machines: Vec::new(),
203            queue,
204            next_height_to_process: config.protocol_paramset.start_height,
205            rpc,
206        };
207
208        mgr.reload_state_manager_from_db().await?;
209        Ok(mgr)
210    }
211
212    async fn get_block(
213        &self,
214        dbtx: Option<DatabaseTransaction<'_, '_>>,
215        height: u32,
216    ) -> Result<bitcoin::Block, BridgeError> {
217        match self.db.get_full_block(dbtx, height).await? {
218            Some(block) => Ok(block),
219            None => Ok(self.rpc.get_block_by_height(height.into()).await?),
220        }
221    }
222
223    /// Loads the state manager and its state machines from the database.
224    /// This method should be called when initializing the StateManager.
225    ///
226    /// # Errors
227    /// Returns a `BridgeError` if the database operation fails
228    pub async fn reload_state_manager_from_db(&mut self) -> Result<(), BridgeError> {
229        let mut dbtx = self.db.begin_transaction().await?;
230        let owner_type = T::ENTITY_NAME;
231        let status = self
232            .db
233            .get_next_height_to_process(Some(&mut dbtx), owner_type)
234            .await?;
235
236        // If no state is saved, return early
237        self.next_height_to_process = match status {
238            Some(block_height) => {
239                u32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?
240            }
241            None => {
242                tracing::info!("No state machines found in the database");
243                self.config.protocol_paramset.start_height
244            }
245        };
246
247        tracing::info!(
248            "Loading state machines from block height {}",
249            self.next_height_to_process.saturating_sub(1)
250        );
251
252        let last_height = self.next_height_to_process.saturating_sub(1);
253
254        self.last_finalized_block = Some(Arc::new(BlockCache::from_block(
255            match self.db.get_full_block(None, last_height).await? {
256                Some(block) => block,
257                None => self.rpc.get_block_by_height(last_height.into()).await?,
258            },
259            last_height,
260        )));
261
262        // Load kickoff machines
263        let kickoff_machines = self
264            .db
265            .load_kickoff_machines(Some(&mut dbtx), owner_type)
266            .await?;
267
268        // Load round machines
269        let round_machines = self
270            .db
271            .load_round_machines(Some(&mut dbtx), owner_type)
272            .await?;
273
274        let init_dbtx = Arc::new(Mutex::new(dbtx));
275
276        let mut ctx = self.new_context_with_block_cache(
277            init_dbtx.clone(),
278            self.last_finalized_block
279                .clone()
280                .expect("Initialized before"),
281        )?;
282
283        // Process and recreate kickoff machines
284        for (state_json, kickoff_id, saved_block_height) in &kickoff_machines {
285            tracing::debug!(
286                "Loaded kickoff machine: state={}, block_height={}",
287                state_json,
288                saved_block_height
289            );
290
291            // Deserialize the machine state from JSON
292            let machine: Result<UninitializedStateMachine<kickoff::KickoffStateMachine<T>>, _> =
293                serde_json::from_str(state_json);
294
295            match machine {
296                Ok(uninitialized) => {
297                    // Initialize the machine with the context
298                    let initialized = uninitialized.init_with_context(&mut ctx).await;
299                    self.kickoff_machines.push(initialized);
300                }
301                Err(e) => {
302                    tracing::warn!(
303                        "Failed to deserialize kickoff machine with ID {}: {}",
304                        kickoff_id,
305                        e
306                    );
307                }
308            }
309        }
310
311        // Process and recreate round machines
312        for (state_json, operator_xonly_pk, saved_block_height) in &round_machines {
313            tracing::debug!(
314                "Loaded round machine: state={}, block_height={}",
315                state_json,
316                saved_block_height
317            );
318
319            // Deserialize the machine state from JSON
320            let machine: Result<UninitializedStateMachine<round::RoundStateMachine<T>>, _> =
321                serde_json::from_str(state_json);
322
323            match machine {
324                Ok(uninitialized) => {
325                    // Initialize the machine with the context
326                    let initialized = uninitialized.init_with_context(&mut ctx).await;
327                    self.round_machines.push(initialized);
328                }
329                Err(e) => {
330                    tracing::error!(
331                        "Failed to deserialize round machine with operator index {:?}: {}",
332                        operator_xonly_pk,
333                        e
334                    );
335                }
336            }
337        }
338
339        if !ctx.errors.is_empty() {
340            return Err(eyre::eyre!(
341                "Multiple errors occurred during state processing: {:?}",
342                ctx.errors
343            )
344            .into());
345        }
346
347        drop(ctx);
348
349        tracing::info!(
350            "Loaded {} kickoff machines and {} round machines from the database",
351            kickoff_machines.len(),
352            round_machines.len()
353        );
354
355        Arc::into_inner(init_dbtx)
356            .ok_or_eyre("Expected single reference to DB tx when committing")?
357            .into_inner()
358            .commit()
359            .await?;
360
361        Ok(())
362    }
363    #[cfg(test)]
364    #[doc(hidden)]
365    pub fn round_machines(&self) -> Vec<InitializedStateMachine<round::RoundStateMachine<T>>> {
366        self.round_machines.clone()
367    }
368
369    #[cfg(test)]
370    #[doc(hidden)]
371    pub fn kickoff_machines(
372        &self,
373    ) -> Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>> {
374        self.kickoff_machines.clone()
375    }
376
377    /// Saves the state machines with dirty flag set to the database.
378    /// Uses the database transaction from the context if any.
379    /// Resets the dirty flag for all machines after successful save.
380    ///
381    /// # Errors
382    /// Returns a `BridgeError` if the database operation fails.
383    pub async fn save_state_to_db(
384        &mut self,
385        context: &mut context::StateContext<T>,
386    ) -> eyre::Result<()> {
387        // Get the owner type from the context
388        let owner_type = T::ENTITY_NAME;
389
390        // Prepare kickoff machines data with direct serialization
391        let kickoff_machines: eyre::Result<Vec<_>> = self
392            .kickoff_machines
393            .iter()
394            // Only serialize machines that are dirty
395            .filter(|machine| machine.dirty)
396            .map(|machine| -> eyre::Result<_> {
397                let state_json = serde_json::to_string(&machine).wrap_err_with(|| {
398                    format!("Failed to serialize kickoff machine: {machine:?}")
399                })?;
400                let kickoff_id =
401                    serde_json::to_string(&machine.kickoff_data).wrap_err_with(|| {
402                        format!("Failed to serialize kickoff id for machine: {machine:?}")
403                    })?;
404                Ok((state_json, (kickoff_id)))
405            })
406            .collect();
407
408        // Prepare round machines data with direct serialization
409        let round_machines: eyre::Result<Vec<_>> = self
410            .round_machines
411            .iter()
412            // Only serialize machines that are dirty
413            .filter(|machine| machine.dirty)
414            .map(|machine| -> eyre::Result<_> {
415                let state_json = serde_json::to_string(machine)
416                    .wrap_err_with(|| format!("Failed to serialize round machine: {machine:?}"))?;
417                let operator_xonly_pk = machine.operator_data.xonly_pk;
418
419                // Use the machine's dirty flag to determine if it needs updating
420                Ok((state_json, (operator_xonly_pk)))
421            })
422            .collect();
423
424        {
425            let mut dbtx = context.shared_dbtx.lock().await;
426            // Use the database function to save the state machines
427            self.db
428                .save_state_machines(
429                    &mut dbtx,
430                    kickoff_machines?,
431                    round_machines?,
432                    context.cache.block_height as i32 + 1,
433                    owner_type,
434                )
435                .await?;
436        }
437
438        // Reset the dirty flag for all machines after successful save
439        for machine in &mut self.kickoff_machines {
440            if machine.dirty {
441                machine
442                    .handle_with_context(&KickoffEvent::SavedToDb, context)
443                    .await;
444            }
445        }
446
447        for machine in &mut self.round_machines {
448            if machine.dirty {
449                machine
450                    .handle_with_context(&RoundEvent::SavedToDb, context)
451                    .await;
452            }
453        }
454
455        Ok(())
456    }
457
458    pub fn get_next_height_to_process(&self) -> u32 {
459        self.next_height_to_process
460    }
461
462    /// Updates the machines using the context and returns machines without
463    /// events and futures that process new events for machines that changed.
464    /// Empties the `machines` vector.
465    ///
466    /// # Parameters
467    /// * `machines`: A mutable reference to the vector of state machines to update.
468    /// * `base_context`: A reference to the base state context.
469    ///
470    /// # Returns
471    /// A tuple of the unchanged machines and the futures that process new
472    /// events for machines that generated events.
473    ///
474    /// # Type Parameters
475    /// * `M`: The type of the state machine.
476    /// * `a`: The lifetime of the state context reference (the future captures the context by reference).
477    #[allow(clippy::type_complexity)]
478    fn update_machines<'a, M>(
479        machines: &mut Vec<InitializedStateMachine<M>>,
480        base_context: &'a context::StateContext<T>,
481    ) -> (
482        Vec<InitializedStateMachine<M>>,
483        Vec<
484            impl Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send + 'a,
485        >,
486    )
487    where
488        M: IntoStateMachine + Send + Sync + 'static,
489        M::State: Send + Sync + 'static,
490        InitializedStateMachine<M>: ContextProcessor<T, M>,
491    {
492        let mut unchanged_machines = Vec::new();
493        let mut processing_futures = Vec::new();
494
495        for machine in std::mem::take(machines).into_iter() {
496            match machine.process_with_ctx(base_context) {
497                ContextProcessResult::Processing(future) => processing_futures.push(future),
498                ContextProcessResult::Unchanged(machine) => unchanged_machines.push(machine),
499            }
500        }
501
502        (unchanged_machines, processing_futures)
503    }
504
505    /// Given some new states and a start height, process the states from the given start height until the next height to process.
506    /// Then append the new states to the current state machines.
507    pub async fn process_and_add_new_states_from_height(
508        &mut self,
509        dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
510        new_round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
511        new_kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
512        start_height: u32,
513    ) -> Result<(), eyre::Report> {
514        // create a temporary state manager that only includes the new states
515        let mut temporary_manager = self.clone_without_machines();
516        temporary_manager.round_machines = new_round_machines;
517        temporary_manager.kickoff_machines = new_kickoff_machines;
518
519        for block_height in start_height..temporary_manager.next_height_to_process {
520            let block = temporary_manager
521                .get_block(Some(&mut *dbtx.lock().await), block_height)
522                .await
523                .wrap_err_with(|| {
524                    format!(
525                        "Block at height {block_height} not found in process_and_add_new_states_from_height"
526                    )
527                })?;
528
529            let mut context = temporary_manager.new_context(dbtx.clone(), &block, block_height)?;
530            temporary_manager
531                .process_block_parallel(&mut context)
532                .await?;
533        }
534
535        // append new states to the current state manager
536        self.round_machines.extend(temporary_manager.round_machines);
537        self.kickoff_machines
538            .extend(temporary_manager.kickoff_machines);
539
540        Ok(())
541    }
542
543    /// It requires that the block cache is updated before calling this function.
544    /// Moves all state machines forward in parallel.
545    /// The state machines are updated until all of them stabilize in their state (ie.
546    /// the block does not generate any new events)
547    ///
548    /// # Errors
549    /// If the state machines do not stabilize after some iterations, we return an error.
550    pub async fn process_block_parallel(
551        &mut self,
552        context: &mut context::StateContext<T>,
553    ) -> Result<(), eyre::Report> {
554        let block_height = context.cache.block_height;
555
556        // Process all machines, for those unaffected collect them them, otherwise return
557        // a future that processes the new events.
558        let (mut final_kickoff_machines, mut kickoff_futures) =
559            Self::update_machines(&mut self.kickoff_machines, context);
560        let (mut final_round_machines, mut round_futures) =
561            Self::update_machines(&mut self.round_machines, context);
562
563        // Here we store number of iterations to detect if the machines do not stabilize after a while
564        // to prevent infinite loops. If a matcher is used, it is deleted, but a bug in implementation
565        // can technically cause infinite loops.
566        let mut iterations = 0;
567
568        // On each iteration, we'll update the changed machines until all machines
569        // stabilize in their state.
570        while !kickoff_futures.is_empty() || !round_futures.is_empty() {
571            // Execute all futures in parallel
572            let (kickoff_results, round_results) =
573                join(join_all(kickoff_futures), join_all(round_futures)).await;
574
575            // Unzip the results into updated machines and state contexts
576            let (mut changed_kickoff_machines, mut kickoff_contexts): (Vec<_>, Vec<_>) =
577                kickoff_results.into_iter().unzip();
578            let (mut changed_round_machines, mut round_contexts): (Vec<_>, Vec<_>) =
579                round_results.into_iter().unzip();
580
581            // Merge and handle errors
582            let mut all_errors = Vec::new();
583            for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
584                all_errors.extend(std::mem::take(&mut ctx.errors));
585            }
586
587            if !all_errors.is_empty() {
588                // Return first error or create a combined error
589                return Err(eyre::eyre!(
590                    "Multiple errors occurred during state processing: {:?}",
591                    all_errors
592                ));
593            }
594
595            // Append the newly generated state machines into the changed machines list
596            for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
597                #[cfg(debug_assertions)]
598                for machine in &ctx.new_round_machines {
599                    if !machine.dirty {
600                        panic!(
601                            "Round machine not dirty despite having been newly created: {:?}",
602                            machine.state()
603                        );
604                    }
605                }
606                #[cfg(debug_assertions)]
607                for machine in &ctx.new_kickoff_machines {
608                    if !machine.dirty {
609                        panic!(
610                            "Kickoff machine not dirty despite having been newly created: {:?}",
611                            machine.state()
612                        );
613                    }
614                }
615                changed_round_machines.extend(std::mem::take(&mut ctx.new_round_machines));
616                changed_kickoff_machines.extend(std::mem::take(&mut ctx.new_kickoff_machines));
617            }
618
619            // If the machines do not stabilize after a while, we return an error
620            //
621            // Something like max(2 * num_kickoffs_per_round, number of utxos in a kickoff * 2) is possibly a safe value
622            if iterations > 100000 {
623                return Err(eyre::eyre!(
624                    r#"{}/{} kickoff and {}/{} round state machines did not stabilize after 100000 iterations, debug repr of changed machines:
625                        ---- Kickoff machines ----
626                        {:?}
627                        ---- Round machines ----
628                        {:?}
629                        "#,
630                    changed_kickoff_machines.len(),
631                    final_kickoff_machines.len() + changed_kickoff_machines.len(),
632                    changed_round_machines.len(),
633                    final_round_machines.len() + changed_round_machines.len(),
634                    changed_kickoff_machines
635                        .iter()
636                        .map(|m| m.state())
637                        .collect::<Vec<_>>(),
638                    changed_round_machines
639                        .iter()
640                        .map(|m| m.state())
641                        .collect::<Vec<_>>(),
642                ));
643            }
644
645            // Reprocess changed machines and commit these futures to be handled
646            // in the next round If they're empty, we'll exit the loop.
647            let (finalized_kickoff_machines, new_kickoff_futures) =
648                Self::update_machines(&mut changed_kickoff_machines, context);
649            let (finalized_round_machines, new_round_futures) =
650                Self::update_machines(&mut changed_round_machines, context);
651            final_kickoff_machines.extend(finalized_kickoff_machines);
652            final_round_machines.extend(finalized_round_machines);
653
654            // Update the futures to be processed
655            kickoff_futures = new_kickoff_futures;
656            round_futures = new_round_futures;
657            iterations += 1;
658        }
659
660        drop(kickoff_futures);
661        drop(round_futures);
662
663        // Set back the original machines
664        self.round_machines = final_round_machines;
665        self.kickoff_machines = final_kickoff_machines;
666
667        self.next_height_to_process = max(block_height + 1, self.next_height_to_process);
668
669        Ok(())
670    }
671}