clementine_core/states/
mod.rs

1//! State manager module
2//!
3//! This module contains the state manager, which is responsible for holding the state machines
4//! of the system representing the current operator state for each operator, and kickoff state for each kickoff
5//! (Each operator and kickoff process has its own state machine).
6//!
7//! The main responsibility of the state manager is to process each finalized block in Bitcoin and update the state machines.
8//! The blocks are scanned for relevant Clementine tx's and internal state of the state machines is updated. This relevant data
9//! is passed on verifiers/operators when its time to send bridge transactions (like operator asserts, watchtower challenges, etc).
10//!
11//! Operator state machine: Stores where in the collateral chain the operator is. (Which round or ready to reimburse tx)
12//!     Additionally during rounds it stores which kickoff utxos of the round are spent.
13//! Kickoff state machine:
14//!     - Stores if the kickoff was challenged.
15//!     - Stores/tracks watchtower challenges, latest blockhash commit by operator, operator asserts, any relevant information needed
16//!           for proving and disproving the kickoff.
17//!
18//! For state machines we use the [statig](https://github.com/mdeloof/statig) crate.
19//!
20
21pub use crate::builder::block_cache;
22use crate::config::BridgeConfig;
23use crate::database::{Database, DatabaseTransaction};
24use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
25use crate::states::block_cache::BlockCache;
26use clementine_errors::BridgeError;
27use eyre::{Context, OptionExt};
28use futures::future::{join, join_all};
29use kickoff::KickoffEvent;
30use matcher::BlockMatcher;
31use pgmq::PGMQueueExt;
32use round::RoundEvent;
33use statig::awaitable::{InitializedStateMachine, UninitializedStateMachine};
34use statig::prelude::*;
35use std::cmp::max;
36use std::future::Future;
37use std::sync::Arc;
38use tokio::sync::Mutex;
39
40pub mod context;
41mod event;
42pub mod kickoff;
43mod matcher;
44pub mod round;
45pub mod task;
46
47pub use context::{Duty, Owner};
48pub use event::SystemEvent;
49
50pub use clementine_errors::StateMachineError;
51pub(crate) enum ContextProcessResult<
52    T: Owner,
53    M: IntoStateMachine,
54    Fut: Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send,
55> {
56    Unchanged(InitializedStateMachine<M>),
57    Processing(Fut),
58}
59
60/// Utility trait to make processing generic
61pub(crate) trait ContextProcessor<T: Owner, M: IntoStateMachine> {
62    /// Processes the machine with the given state context (which contains the block cache)
63    /// If the machine is unchanged, it is returned as is. Otherwise, the machine is processed
64    /// and the result is returned as a future that processes the new events.
65    fn process_with_ctx(
66        self,
67        block: &context::StateContext<T>,
68    ) -> ContextProcessResult<
69        T,
70        M,
71        impl Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send,
72    >;
73}
74
75/// Generic implementation for all state machines
76impl<T, M> ContextProcessor<T, M> for InitializedStateMachine<M>
77where
78    T: Owner,
79    for<'evt, 'ctx> M: IntoStateMachine<Event<'evt> = M::StateEvent, Context<'ctx> = context::StateContext<T>>
80        + Send
81        + BlockMatcher
82        + Clone,
83    M::State: awaitable::State<M> + 'static + Send,
84    for<'sub> M::Superstate<'sub>: awaitable::Superstate<M> + Send,
85    for<'evt> M::Event<'evt>: Send + Sync,
86{
87    fn process_with_ctx(
88        mut self,
89        block: &context::StateContext<T>,
90    ) -> ContextProcessResult<T, M, impl Future<Output = (Self, context::StateContext<T>)> + Send>
91    {
92        let events = self.match_block(&block.cache);
93        if events.is_empty() {
94            ContextProcessResult::Unchanged(self)
95        } else {
96            let mut ctx = block.clone();
97            ContextProcessResult::Processing(async move {
98                for event in events {
99                    self.handle_with_context(&event, &mut ctx).await;
100                }
101                (self, ctx)
102            })
103        }
104    }
105}
106
107/// State manager stores the state machines.
108/// It is responsible for following:
109///     - Persisting current state of the state machines to the database.
110///     - Collecting new [`SystemEvent`]s from the message queue and passing them to the state machines,
111///           thus updating the state machines.
112#[derive(Debug, Clone)]
113pub struct StateManager<T: Owner> {
114    pub db: Database,
115    queue: PGMQueueExt,
116    owner: T,
117    round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
118    kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
119    config: BridgeConfig,
120    next_height_to_process: u32,
121    rpc: ExtendedBitcoinRpc,
122    // Set on the first finalized block event or the load_from_db method
123    last_finalized_block: Option<Arc<BlockCache>>,
124}
125
126impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
127    /// Returns message queue name for the state manager.
128    pub fn queue_name() -> String {
129        format!("{}_state_mgr_events", T::ENTITY_NAME)
130    }
131
132    pub fn clone_without_machines(&self) -> Self {
133        Self {
134            db: self.db.clone(),
135            queue: self.queue.clone(),
136            owner: self.owner.clone(),
137            round_machines: Vec::new(),
138            kickoff_machines: Vec::new(),
139            config: self.config.clone(),
140            next_height_to_process: self.next_height_to_process,
141            rpc: self.rpc.clone(),
142            last_finalized_block: self.last_finalized_block.clone(),
143        }
144    }
145
146    /// Warning: This is costly due to the calculation of the block_cache, use a
147    /// pre-existing `block_cache` with the `new_context_with_block_cache`
148    /// method if possible.
149    pub fn new_context(
150        &self,
151        dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
152        block: &bitcoin::Block,
153        block_height: u32,
154    ) -> Result<context::StateContext<T>, BridgeError> {
155        Ok(context::StateContext::new(
156            dbtx,
157            Arc::new(self.owner.clone()),
158            Arc::new(BlockCache::from_block(block.clone(), block_height)),
159            self.config.clone(),
160        ))
161    }
162
163    pub fn new_context_with_block_cache(
164        &self,
165        dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
166        block_cache: Arc<BlockCache>,
167    ) -> Result<context::StateContext<T>, BridgeError> {
168        Ok(context::StateContext::new(
169            dbtx,
170            Arc::new(self.owner.clone()),
171            block_cache,
172            self.config.clone(),
173        ))
174    }
175
176    pub async fn new(
177        db: Database,
178        owner: T,
179        rpc: ExtendedBitcoinRpc,
180        config: BridgeConfig,
181    ) -> eyre::Result<Self> {
182        let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
183
184        queue.create(&Self::queue_name()).await.wrap_err_with(|| {
185            format!("Error creating pqmq queue with name {}", Self::queue_name())
186        })?;
187
188        let mut mgr = Self {
189            last_finalized_block: None,
190            db,
191            owner,
192            config: config.clone(),
193            round_machines: Vec::new(),
194            kickoff_machines: Vec::new(),
195            queue,
196            next_height_to_process: config.protocol_paramset.start_height,
197            rpc,
198        };
199
200        mgr.reload_state_manager_from_db().await?;
201        Ok(mgr)
202    }
203
204    async fn get_block(
205        &self,
206        dbtx: Option<DatabaseTransaction<'_>>,
207        height: u32,
208    ) -> Result<bitcoin::Block, BridgeError> {
209        match self.db.get_full_block(dbtx, height).await? {
210            Some(block) => Ok(block),
211            None => Ok(self.rpc.get_block_by_height(height.into()).await?),
212        }
213    }
214
215    /// Loads the state manager and its state machines from the database.
216    /// This method should be called when initializing the StateManager.
217    ///
218    /// # Errors
219    /// Returns a `BridgeError` if the database operation fails
220    pub async fn reload_state_manager_from_db(&mut self) -> Result<(), BridgeError> {
221        let mut dbtx = self.db.begin_transaction().await?;
222        let owner_type = T::ENTITY_NAME;
223        let status = self
224            .db
225            .get_next_height_to_process(Some(&mut dbtx), owner_type)
226            .await?;
227
228        // If no state is saved, return early
229        self.next_height_to_process = match status {
230            Some(block_height) => {
231                u32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?
232            }
233            None => {
234                tracing::info!("No state machines found in the database");
235                self.config.protocol_paramset.start_height
236            }
237        };
238
239        tracing::info!(
240            "Loading state machines from block height {}",
241            self.next_height_to_process.saturating_sub(1)
242        );
243
244        let last_height = self.next_height_to_process.saturating_sub(1);
245
246        self.last_finalized_block = Some(Arc::new(BlockCache::from_block(
247            match self.db.get_full_block(None, last_height).await? {
248                Some(block) => block,
249                None => self.rpc.get_block_by_height(last_height.into()).await?,
250            },
251            last_height,
252        )));
253
254        // Load kickoff machines
255        let kickoff_machines = self
256            .db
257            .load_kickoff_machines(Some(&mut dbtx), owner_type)
258            .await?;
259
260        // Load round machines
261        let round_machines = self
262            .db
263            .load_round_machines(Some(&mut dbtx), owner_type)
264            .await?;
265
266        let init_dbtx = Arc::new(Mutex::new(dbtx));
267
268        let mut ctx = self.new_context_with_block_cache(
269            init_dbtx.clone(),
270            self.last_finalized_block
271                .clone()
272                .expect("Initialized before"),
273        )?;
274
275        // Process and recreate kickoff machines
276        for (state_json, kickoff_id, saved_block_height) in &kickoff_machines {
277            tracing::debug!(
278                "Loaded kickoff machine: state={}, block_height={}",
279                state_json,
280                saved_block_height
281            );
282
283            // Deserialize the machine state from JSON
284            let machine: Result<UninitializedStateMachine<kickoff::KickoffStateMachine<T>>, _> =
285                serde_json::from_str(state_json);
286
287            match machine {
288                Ok(uninitialized) => {
289                    // Initialize the machine with the context
290                    let initialized = uninitialized.init_with_context(&mut ctx).await;
291                    self.kickoff_machines.push(initialized);
292                }
293                Err(e) => {
294                    tracing::warn!(
295                        "Failed to deserialize kickoff machine with ID {}: {}",
296                        kickoff_id,
297                        e
298                    );
299                }
300            }
301        }
302
303        // Process and recreate round machines
304        for (state_json, operator_xonly_pk, saved_block_height) in &round_machines {
305            tracing::debug!(
306                "Loaded round machine: state={}, block_height={}",
307                state_json,
308                saved_block_height
309            );
310
311            // Deserialize the machine state from JSON
312            let machine: Result<UninitializedStateMachine<round::RoundStateMachine<T>>, _> =
313                serde_json::from_str(state_json);
314
315            match machine {
316                Ok(uninitialized) => {
317                    // Initialize the machine with the context
318                    let initialized = uninitialized.init_with_context(&mut ctx).await;
319                    self.round_machines.push(initialized);
320                }
321                Err(e) => {
322                    tracing::error!(
323                        "Failed to deserialize round machine with operator index {:?}: {}",
324                        operator_xonly_pk,
325                        e
326                    );
327                }
328            }
329        }
330
331        if !ctx.errors.is_empty() {
332            return Err(eyre::eyre!(
333                "Multiple errors occurred during state processing: {:?}",
334                ctx.errors
335            )
336            .into());
337        }
338
339        drop(ctx);
340
341        tracing::info!(
342            "Loaded {} kickoff machines and {} round machines from the database",
343            kickoff_machines.len(),
344            round_machines.len()
345        );
346
347        Arc::into_inner(init_dbtx)
348            .ok_or_eyre("Expected single reference to DB tx when committing")?
349            .into_inner()
350            .commit()
351            .await?;
352
353        Ok(())
354    }
355    #[cfg(test)]
356    #[doc(hidden)]
357    pub fn round_machines(&self) -> Vec<InitializedStateMachine<round::RoundStateMachine<T>>> {
358        self.round_machines.clone()
359    }
360
361    #[cfg(test)]
362    #[doc(hidden)]
363    pub fn kickoff_machines(
364        &self,
365    ) -> Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>> {
366        self.kickoff_machines.clone()
367    }
368
369    /// Saves the state machines with dirty flag set to the database.
370    /// Uses the database transaction from the context if any.
371    /// Resets the dirty flag for all machines after successful save.
372    ///
373    /// # Errors
374    /// Returns a `BridgeError` if the database operation fails.
375    pub async fn save_state_to_db(
376        &mut self,
377        context: &mut context::StateContext<T>,
378    ) -> eyre::Result<()> {
379        // Get the owner type from the context
380        let owner_type = T::ENTITY_NAME;
381
382        // Prepare kickoff machines data with direct serialization
383        let kickoff_machines: eyre::Result<Vec<_>> = self
384            .kickoff_machines
385            .iter()
386            // Only serialize machines that are dirty
387            .filter(|machine| machine.dirty)
388            .map(|machine| -> eyre::Result<_> {
389                let state_json = serde_json::to_string(&machine).wrap_err_with(|| {
390                    format!("Failed to serialize kickoff machine: {machine:?}")
391                })?;
392                let kickoff_id =
393                    serde_json::to_string(&machine.kickoff_data).wrap_err_with(|| {
394                        format!("Failed to serialize kickoff id for machine: {machine:?}")
395                    })?;
396                Ok((state_json, (kickoff_id)))
397            })
398            .collect();
399
400        // Prepare round machines data with direct serialization
401        let round_machines: eyre::Result<Vec<_>> = self
402            .round_machines
403            .iter()
404            // Only serialize machines that are dirty
405            .filter(|machine| machine.dirty)
406            .map(|machine| -> eyre::Result<_> {
407                let state_json = serde_json::to_string(machine)
408                    .wrap_err_with(|| format!("Failed to serialize round machine: {machine:?}"))?;
409                let operator_xonly_pk = machine.operator_data.xonly_pk;
410
411                // Use the machine's dirty flag to determine if it needs updating
412                Ok((state_json, (operator_xonly_pk)))
413            })
414            .collect();
415
416        {
417            let mut dbtx = context.shared_dbtx.lock().await;
418            // Use the database function to save the state machines
419            self.db
420                .save_state_machines(
421                    &mut dbtx,
422                    kickoff_machines?,
423                    round_machines?,
424                    context.cache.block_height as i32 + 1,
425                    owner_type,
426                )
427                .await?;
428        }
429
430        // Reset the dirty flag for all machines after successful save
431        for machine in &mut self.kickoff_machines {
432            if machine.dirty {
433                machine
434                    .handle_with_context(&KickoffEvent::SavedToDb, context)
435                    .await;
436            }
437        }
438
439        for machine in &mut self.round_machines {
440            if machine.dirty {
441                machine
442                    .handle_with_context(&RoundEvent::SavedToDb, context)
443                    .await;
444            }
445        }
446
447        Ok(())
448    }
449
450    pub fn get_next_height_to_process(&self) -> u32 {
451        self.next_height_to_process
452    }
453
454    /// Updates the machines using the context and returns machines without
455    /// events and futures that process new events for machines that changed.
456    /// Empties the `machines` vector.
457    ///
458    /// # Parameters
459    /// * `machines`: A mutable reference to the vector of state machines to update.
460    /// * `base_context`: A reference to the base state context.
461    ///
462    /// # Returns
463    /// A tuple of the unchanged machines and the futures that process new
464    /// events for machines that generated events.
465    ///
466    /// # Type Parameters
467    /// * `M`: The type of the state machine.
468    /// * `a`: The lifetime of the state context reference (the future captures the context by reference).
469    #[allow(clippy::type_complexity)]
470    fn update_machines<'a, M>(
471        machines: &mut Vec<InitializedStateMachine<M>>,
472        base_context: &'a context::StateContext<T>,
473    ) -> (
474        Vec<InitializedStateMachine<M>>,
475        Vec<
476            impl Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send + 'a,
477        >,
478    )
479    where
480        M: IntoStateMachine + Send + Sync + 'static,
481        M::State: Send + Sync + 'static,
482        InitializedStateMachine<M>: ContextProcessor<T, M>,
483    {
484        let mut unchanged_machines = Vec::new();
485        let mut processing_futures = Vec::new();
486
487        for machine in std::mem::take(machines).into_iter() {
488            match machine.process_with_ctx(base_context) {
489                ContextProcessResult::Processing(future) => processing_futures.push(future),
490                ContextProcessResult::Unchanged(machine) => unchanged_machines.push(machine),
491            }
492        }
493
494        (unchanged_machines, processing_futures)
495    }
496
497    /// Given some new states and a start height, process the states from the given start height until the next height to process.
498    /// Then append the new states to the current state machines.
499    pub async fn process_and_add_new_states_from_height(
500        &mut self,
501        dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
502        new_round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
503        new_kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
504        start_height: u32,
505    ) -> Result<(), eyre::Report> {
506        // create a temporary state manager that only includes the new states
507        let mut temporary_manager = self.clone_without_machines();
508        temporary_manager.round_machines = new_round_machines;
509        temporary_manager.kickoff_machines = new_kickoff_machines;
510
511        for block_height in start_height..temporary_manager.next_height_to_process {
512            let block = temporary_manager
513                .get_block(Some(&mut *dbtx.lock().await), block_height)
514                .await
515                .wrap_err_with(|| {
516                    format!(
517                        "Block at height {block_height} not found in process_and_add_new_states_from_height"
518                    )
519                })?;
520
521            let mut context = temporary_manager.new_context(dbtx.clone(), &block, block_height)?;
522            temporary_manager
523                .process_block_parallel(&mut context)
524                .await?;
525        }
526
527        // append new states to the current state manager
528        self.round_machines.extend(temporary_manager.round_machines);
529        self.kickoff_machines
530            .extend(temporary_manager.kickoff_machines);
531
532        Ok(())
533    }
534
535    /// It requires that the block cache is updated before calling this function.
536    /// Moves all state machines forward in parallel.
537    /// The state machines are updated until all of them stabilize in their state (ie.
538    /// the block does not generate any new events)
539    ///
540    /// # Errors
541    /// If the state machines do not stabilize after some iterations, we return an error.
542    pub async fn process_block_parallel(
543        &mut self,
544        context: &mut context::StateContext<T>,
545    ) -> Result<(), eyre::Report> {
546        let block_height = context.cache.block_height;
547
548        // Process all machines, for those unaffected collect them them, otherwise return
549        // a future that processes the new events.
550        let (mut final_kickoff_machines, mut kickoff_futures) =
551            Self::update_machines(&mut self.kickoff_machines, context);
552        let (mut final_round_machines, mut round_futures) =
553            Self::update_machines(&mut self.round_machines, context);
554
555        // Here we store number of iterations to detect if the machines do not stabilize after a while
556        // to prevent infinite loops. If a matcher is used, it is deleted, but a bug in implementation
557        // can technically cause infinite loops.
558        let mut iterations = 0;
559
560        // On each iteration, we'll update the changed machines until all machines
561        // stabilize in their state.
562        while !kickoff_futures.is_empty() || !round_futures.is_empty() {
563            // Execute all futures in parallel
564            let (kickoff_results, round_results) =
565                join(join_all(kickoff_futures), join_all(round_futures)).await;
566
567            // Unzip the results into updated machines and state contexts
568            let (mut changed_kickoff_machines, mut kickoff_contexts): (Vec<_>, Vec<_>) =
569                kickoff_results.into_iter().unzip();
570            let (mut changed_round_machines, mut round_contexts): (Vec<_>, Vec<_>) =
571                round_results.into_iter().unzip();
572
573            // Merge and handle errors
574            let mut all_errors = Vec::new();
575            for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
576                all_errors.extend(std::mem::take(&mut ctx.errors));
577            }
578
579            if !all_errors.is_empty() {
580                // Return first error or create a combined error
581                return Err(eyre::eyre!(
582                    "Multiple errors occurred during state processing: {:?}",
583                    all_errors
584                ));
585            }
586
587            // Append the newly generated state machines into the changed machines list
588            for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
589                #[cfg(debug_assertions)]
590                for machine in &ctx.new_round_machines {
591                    if !machine.dirty {
592                        panic!(
593                            "Round machine not dirty despite having been newly created: {:?}",
594                            machine.state()
595                        );
596                    }
597                }
598                #[cfg(debug_assertions)]
599                for machine in &ctx.new_kickoff_machines {
600                    if !machine.dirty {
601                        panic!(
602                            "Kickoff machine not dirty despite having been newly created: {:?}",
603                            machine.state()
604                        );
605                    }
606                }
607                changed_round_machines.extend(std::mem::take(&mut ctx.new_round_machines));
608                changed_kickoff_machines.extend(std::mem::take(&mut ctx.new_kickoff_machines));
609            }
610
611            // If the machines do not stabilize after a while, we return an error
612            //
613            // Something like max(2 * num_kickoffs_per_round, number of utxos in a kickoff * 2) is possibly a safe value
614            if iterations > 100000 {
615                return Err(eyre::eyre!(
616                    r#"{}/{} kickoff and {}/{} round state machines did not stabilize after 100000 iterations, debug repr of changed machines:
617                        ---- Kickoff machines ----
618                        {:?}
619                        ---- Round machines ----
620                        {:?}
621                        "#,
622                    changed_kickoff_machines.len(),
623                    final_kickoff_machines.len() + changed_kickoff_machines.len(),
624                    changed_round_machines.len(),
625                    final_round_machines.len() + changed_round_machines.len(),
626                    changed_kickoff_machines
627                        .iter()
628                        .map(|m| m.state())
629                        .collect::<Vec<_>>(),
630                    changed_round_machines
631                        .iter()
632                        .map(|m| m.state())
633                        .collect::<Vec<_>>(),
634                ));
635            }
636
637            // Reprocess changed machines and commit these futures to be handled
638            // in the next round If they're empty, we'll exit the loop.
639            let (finalized_kickoff_machines, new_kickoff_futures) =
640                Self::update_machines(&mut changed_kickoff_machines, context);
641            let (finalized_round_machines, new_round_futures) =
642                Self::update_machines(&mut changed_round_machines, context);
643            final_kickoff_machines.extend(finalized_kickoff_machines);
644            final_round_machines.extend(finalized_round_machines);
645
646            // Update the futures to be processed
647            kickoff_futures = new_kickoff_futures;
648            round_futures = new_round_futures;
649            iterations += 1;
650        }
651
652        drop(kickoff_futures);
653        drop(round_futures);
654
655        // Set back the original machines
656        self.round_machines = final_round_machines;
657        self.kickoff_machines = final_kickoff_machines;
658
659        self.next_height_to_process = max(block_height + 1, self.next_height_to_process);
660
661        Ok(())
662    }
663}