clementine_core/states/
mod.rs

1//! State manager module
2//!
3//! This module contains the state manager, which is responsible for holding the state machines
4//! of the system representing the current operator state for each operator, and kickoff state for each kickoff
5//! (Each operator and kickoff process has its own state machine).
6//!
7//! The main responsibility of the state manager is to process each finalized block in Bitcoin and update the state machines.
8//! The blocks are scanned for relevant Clementine tx's and internal state of the state machines is updated. This relevant data
9//! is passed on verifiers/operators when its time to send bridge transactions (like operator asserts, watchtower challenges, etc).
10//!
11//! Operator state machine: Stores where in the collateral chain the operator is. (Which round or ready to reimburse tx)
12//!     Additionally during rounds it stores which kickoff utxos of the round are spent.
13//! Kickoff state machine:
14//!     - Stores if the kickoff was challenged.
15//!     - Stores/tracks watchtower challenges, latest blockhash commit by operator, operator asserts, any relevant information needed
16//!           for proving and disproving the kickoff.
17//!
18//! For state machines we use the [statig](https://github.com/mdeloof/statig) crate.
19//!
20
21pub use crate::builder::block_cache;
22use crate::config::BridgeConfig;
23use crate::database::{Database, DatabaseTransaction};
24use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
25use crate::states::block_cache::BlockCache;
26use clementine_errors::BridgeError;
27use eyre::{Context, OptionExt};
28use futures::future::{join, join_all};
29use kickoff::KickoffEvent;
30use matcher::BlockMatcher;
31use pgmq::PGMQueueExt;
32use round::RoundEvent;
33use statig::awaitable::{InitializedStateMachine, UninitializedStateMachine};
34use statig::prelude::*;
35use std::cmp::max;
36use std::future::Future;
37use std::sync::Arc;
38use tokio::sync::Mutex;
39
40pub mod context;
41mod event;
42pub mod kickoff;
43mod matcher;
44pub mod round;
45pub mod task;
46
47pub use context::{Duty, Owner};
48pub use event::SystemEvent;
49
50pub use clementine_errors::StateMachineError;
51pub(crate) enum ContextProcessResult<
52    T: Owner,
53    M: IntoStateMachine,
54    Fut: Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send,
55> {
56    Unchanged(InitializedStateMachine<M>),
57    Processing(Fut),
58}
59
60/// Utility trait to make processing generic
61pub(crate) trait ContextProcessor<T: Owner, M: IntoStateMachine> {
62    /// Processes the machine with the given state context (which contains the block cache)
63    /// If the machine is unchanged, it is returned as is. Otherwise, the machine is processed
64    /// and the result is returned as a future that processes the new events.
65    fn process_with_ctx(
66        self,
67        block: &context::StateContext<T>,
68    ) -> ContextProcessResult<
69        T,
70        M,
71        impl Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send,
72    >;
73}
74
75/// Generic implementation for all state machines
76impl<T, M> ContextProcessor<T, M> for InitializedStateMachine<M>
77where
78    T: Owner,
79    for<'evt, 'ctx> M: IntoStateMachine<Event<'evt> = M::StateEvent, Context<'ctx> = context::StateContext<T>>
80        + Send
81        + BlockMatcher
82        + Clone,
83    M::State: awaitable::State<M> + 'static + Send,
84    for<'sub> M::Superstate<'sub>: awaitable::Superstate<M> + Send,
85    for<'evt> M::Event<'evt>: Send + Sync,
86{
87    fn process_with_ctx(
88        mut self,
89        block: &context::StateContext<T>,
90    ) -> ContextProcessResult<T, M, impl Future<Output = (Self, context::StateContext<T>)> + Send>
91    {
92        let events = self.match_block(&block.cache);
93        if events.is_empty() {
94            ContextProcessResult::Unchanged(self)
95        } else {
96            let mut ctx = block.clone();
97            ContextProcessResult::Processing(async move {
98                for event in events {
99                    self.handle_with_context(&event, &mut ctx).await;
100                }
101                (self, ctx)
102            })
103        }
104    }
105}
106
107/// State manager stores the state machines.
108/// It is responsible for following:
109///     - Persisting current state of the state machines to the database.
110///     - Collecting new [`SystemEvent`]s from the message queue and passing them to the state machines,
111///           thus updating the state machines.
112#[derive(Debug, Clone)]
113pub struct StateManager<T: Owner> {
114    pub db: Database,
115    queue: PGMQueueExt,
116    owner: T,
117    round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
118    kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
119    config: BridgeConfig,
120    next_height_to_process: u32,
121    last_processed_lcp: Option<u32>,
122    rpc: ExtendedBitcoinRpc,
123    // Set on the first finalized block event or the load_from_db method
124    last_finalized_block: Option<Arc<BlockCache>>,
125}
126
127impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
128    /// Returns message queue name for the state manager.
129    pub fn queue_name() -> String {
130        format!("{}_state_mgr_events", T::ENTITY_NAME)
131    }
132
133    pub fn clone_without_machines(&self) -> Self {
134        Self {
135            db: self.db.clone(),
136            queue: self.queue.clone(),
137            owner: self.owner.clone(),
138            round_machines: Vec::new(),
139            kickoff_machines: Vec::new(),
140            config: self.config.clone(),
141            next_height_to_process: self.next_height_to_process,
142            last_processed_lcp: self.last_processed_lcp,
143            rpc: self.rpc.clone(),
144            last_finalized_block: self.last_finalized_block.clone(),
145        }
146    }
147
148    /// Warning: This is costly due to the calculation of the block_cache, use a
149    /// pre-existing `block_cache` with the `new_context_with_block_cache`
150    /// method if possible.
151    pub fn new_context(
152        &self,
153        dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
154        block: &bitcoin::Block,
155        block_height: u32,
156    ) -> Result<context::StateContext<T>, BridgeError> {
157        Ok(context::StateContext::new(
158            dbtx,
159            Arc::new(self.owner.clone()),
160            Arc::new(BlockCache::from_block(block.clone(), block_height)),
161            self.config.clone(),
162        ))
163    }
164
165    pub fn new_context_with_block_cache(
166        &self,
167        dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
168        block_cache: Arc<BlockCache>,
169    ) -> Result<context::StateContext<T>, BridgeError> {
170        Ok(context::StateContext::new(
171            dbtx,
172            Arc::new(self.owner.clone()),
173            block_cache,
174            self.config.clone(),
175        ))
176    }
177
178    pub async fn new(
179        db: Database,
180        owner: T,
181        rpc: ExtendedBitcoinRpc,
182        config: BridgeConfig,
183    ) -> eyre::Result<Self> {
184        let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
185
186        queue.create(&Self::queue_name()).await.wrap_err_with(|| {
187            format!("Error creating pqmq queue with name {}", Self::queue_name())
188        })?;
189
190        let mut mgr = Self {
191            last_finalized_block: None,
192            db,
193            owner,
194            config: config.clone(),
195            round_machines: Vec::new(),
196            kickoff_machines: Vec::new(),
197            queue,
198            next_height_to_process: config.protocol_paramset.start_height,
199            last_processed_lcp: None,
200            rpc,
201        };
202
203        mgr.reload_state_manager_from_db().await?;
204        Ok(mgr)
205    }
206
207    async fn get_block(
208        &self,
209        dbtx: Option<DatabaseTransaction<'_>>,
210        height: u32,
211    ) -> Result<bitcoin::Block, BridgeError> {
212        match self.db.get_full_block(dbtx, height).await? {
213            Some(block) => Ok(block),
214            None => Ok(self.rpc.get_block_by_height(height.into()).await?),
215        }
216    }
217
218    /// Loads the state manager and its state machines from the database.
219    /// This method should be called when initializing the StateManager.
220    ///
221    /// # Errors
222    /// Returns a `BridgeError` if the database operation fails
223    pub async fn reload_state_manager_from_db(&mut self) -> Result<(), BridgeError> {
224        let mut dbtx = self.db.begin_transaction().await?;
225        let owner_type = T::ENTITY_NAME;
226        let status = self
227            .db
228            .get_next_height_to_process(Some(&mut dbtx), owner_type)
229            .await?;
230
231        // Load last processed LCP
232        let last_processed_lcp = self
233            .db
234            .get_last_processed_lcp(Some(&mut dbtx), owner_type)
235            .await?;
236
237        let loaded_last_processed_lcp = last_processed_lcp
238            .map(|lcp| u32::try_from(lcp).wrap_err("Last processed LCP doesn't fit into u32"))
239            .transpose()?;
240
241        // If no state is saved, return early
242        let loaded_next_height_to_process = match status {
243            Some(block_height) => {
244                u32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?
245            }
246            None => {
247                tracing::info!("No state machines found in the database");
248                self.config.protocol_paramset.start_height
249            }
250        };
251
252        tracing::info!(
253            "Loading state machines from block height {}",
254            loaded_next_height_to_process.saturating_sub(1)
255        );
256
257        let last_height = loaded_next_height_to_process.saturating_sub(1);
258
259        let loaded_last_finalized_block = Arc::new(BlockCache::from_block(
260            match self.db.get_full_block(None, last_height).await? {
261                Some(block) => block,
262                None => self.rpc.get_block_by_height(last_height.into()).await?,
263            },
264            last_height,
265        ));
266
267        // Load kickoff machines
268        let kickoff_machines = self
269            .db
270            .load_kickoff_machines(Some(&mut dbtx), owner_type)
271            .await?;
272
273        // Load round machines
274        let round_machines = self
275            .db
276            .load_round_machines(Some(&mut dbtx), owner_type)
277            .await?;
278
279        let init_dbtx = Arc::new(Mutex::new(dbtx));
280
281        let mut ctx = self
282            .new_context_with_block_cache(init_dbtx.clone(), loaded_last_finalized_block.clone())?;
283
284        // Process and recreate kickoff machines
285        let mut loaded_kickoff_machines = Vec::with_capacity(kickoff_machines.len());
286        for (state_json, kickoff_id, saved_block_height) in &kickoff_machines {
287            tracing::debug!(
288                "Loaded kickoff machine: state={}, block_height={}",
289                state_json,
290                saved_block_height
291            );
292
293            // Deserialize the machine state from JSON
294            let machine: Result<UninitializedStateMachine<kickoff::KickoffStateMachine<T>>, _> =
295                serde_json::from_str(state_json);
296
297            match machine {
298                Ok(uninitialized) => {
299                    // Initialize the machine with the context
300                    let initialized = uninitialized.init_with_context(&mut ctx).await;
301                    loaded_kickoff_machines.push(initialized);
302                }
303                Err(e) => {
304                    return Err(eyre::eyre!(
305                        "Failed to deserialize kickoff machine with ID {}: {}",
306                        kickoff_id,
307                        e
308                    )
309                    .into());
310                }
311            }
312        }
313
314        // Process and recreate round machines
315        let mut loaded_round_machines = Vec::with_capacity(round_machines.len());
316        for (state_json, operator_xonly_pk, saved_block_height) in &round_machines {
317            tracing::debug!(
318                "Loaded round machine: state={}, block_height={}",
319                state_json,
320                saved_block_height
321            );
322
323            // Deserialize the machine state from JSON
324            let machine: Result<UninitializedStateMachine<round::RoundStateMachine<T>>, _> =
325                serde_json::from_str(state_json);
326
327            match machine {
328                Ok(uninitialized) => {
329                    // Initialize the machine with the context
330                    let initialized = uninitialized.init_with_context(&mut ctx).await;
331                    loaded_round_machines.push(initialized);
332                }
333                Err(e) => {
334                    return Err(eyre::eyre!(
335                        "Failed to deserialize round machine with operator index {:?}: {}",
336                        operator_xonly_pk,
337                        e
338                    )
339                    .into());
340                }
341            }
342        }
343
344        if !ctx.errors.is_empty() {
345            return Err(eyre::eyre!(
346                "Multiple errors occurred during state processing: {:?}",
347                ctx.errors
348            )
349            .into());
350        }
351
352        drop(ctx);
353
354        tracing::info!(
355            "Loaded {} kickoff machines and {} round machines from the database",
356            loaded_kickoff_machines.len(),
357            loaded_round_machines.len()
358        );
359
360        Arc::into_inner(init_dbtx)
361            .ok_or_eyre("Expected single reference to DB tx when committing")?
362            .into_inner()
363            .commit()
364            .await?;
365
366        self.last_processed_lcp = loaded_last_processed_lcp;
367        self.next_height_to_process = loaded_next_height_to_process;
368        self.last_finalized_block = Some(loaded_last_finalized_block);
369        self.kickoff_machines = loaded_kickoff_machines;
370        self.round_machines = loaded_round_machines;
371
372        Ok(())
373    }
374    #[cfg(test)]
375    #[doc(hidden)]
376    pub fn round_machines(&self) -> Vec<InitializedStateMachine<round::RoundStateMachine<T>>> {
377        self.round_machines.clone()
378    }
379
380    #[cfg(test)]
381    #[doc(hidden)]
382    pub fn kickoff_machines(
383        &self,
384    ) -> Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>> {
385        self.kickoff_machines.clone()
386    }
387
388    /// Saves the state machines with dirty flag set to the database.
389    /// Uses the database transaction from the context if any.
390    /// Resets the dirty flag for all machines after successful save.
391    ///
392    /// # Errors
393    /// Returns a `BridgeError` if the database operation fails.
394    pub async fn save_state_to_db(
395        &mut self,
396        context: &mut context::StateContext<T>,
397    ) -> eyre::Result<()> {
398        // Get the owner type from the context
399        let owner_type = T::ENTITY_NAME;
400
401        // Prepare kickoff machines data with direct serialization
402        let kickoff_machines: eyre::Result<Vec<_>> = self
403            .kickoff_machines
404            .iter()
405            // Only serialize machines that are dirty
406            .filter(|machine| machine.dirty)
407            .map(|machine| -> eyre::Result<_> {
408                let state_json = serde_json::to_string(&machine).wrap_err_with(|| {
409                    format!("Failed to serialize kickoff machine: {machine:?}")
410                })?;
411                let kickoff_id =
412                    serde_json::to_string(&machine.kickoff_data).wrap_err_with(|| {
413                        format!("Failed to serialize kickoff id for machine: {machine:?}")
414                    })?;
415                Ok((state_json, (kickoff_id)))
416            })
417            .collect();
418
419        // Prepare round machines data with direct serialization
420        let round_machines: eyre::Result<Vec<_>> = self
421            .round_machines
422            .iter()
423            // Only serialize machines that are dirty
424            .filter(|machine| machine.dirty)
425            .map(|machine| -> eyre::Result<_> {
426                let state_json = serde_json::to_string(machine)
427                    .wrap_err_with(|| format!("Failed to serialize round machine: {machine:?}"))?;
428                let operator_xonly_pk = machine.operator_data.xonly_pk;
429
430                // Use the machine's dirty flag to determine if it needs updating
431                Ok((state_json, (operator_xonly_pk)))
432            })
433            .collect();
434
435        {
436            let mut dbtx = context.shared_dbtx.lock().await;
437
438            let last_processed_lcp_i32 = self
439                .last_processed_lcp
440                .map(i32::try_from)
441                .transpose()
442                .wrap_err("Failed to convert last_processed_lcp to i32")?;
443
444            let block_height_i32 = i32::try_from(context.cache.block_height)
445                .wrap_err("Failed to convert block_height to i32")?;
446
447            // Use the database function to save the state machines
448            self.db
449                .save_state_machines(
450                    &mut dbtx,
451                    kickoff_machines?,
452                    round_machines?,
453                    block_height_i32 + 1,
454                    owner_type,
455                    last_processed_lcp_i32,
456                )
457                .await?;
458        }
459
460        // Reset the dirty flag for all machines after successful save
461        for machine in &mut self.kickoff_machines {
462            if machine.dirty {
463                machine
464                    .handle_with_context(&KickoffEvent::SavedToDb, context)
465                    .await;
466            }
467        }
468
469        for machine in &mut self.round_machines {
470            if machine.dirty {
471                machine
472                    .handle_with_context(&RoundEvent::SavedToDb, context)
473                    .await;
474            }
475        }
476
477        Ok(())
478    }
479
480    pub fn get_next_height_to_process(&self) -> u32 {
481        self.next_height_to_process
482    }
483
484    /// Updates the machines using the context and returns machines without
485    /// events and futures that process new events for machines that changed.
486    /// Empties the `machines` vector.
487    ///
488    /// # Parameters
489    /// * `machines`: A mutable reference to the vector of state machines to update.
490    /// * `base_context`: A reference to the base state context.
491    ///
492    /// # Returns
493    /// A tuple of the unchanged machines and the futures that process new
494    /// events for machines that generated events.
495    ///
496    /// # Type Parameters
497    /// * `M`: The type of the state machine.
498    /// * `a`: The lifetime of the state context reference (the future captures the context by reference).
499    #[allow(clippy::type_complexity)]
500    fn update_machines<'a, M>(
501        machines: &mut Vec<InitializedStateMachine<M>>,
502        base_context: &'a context::StateContext<T>,
503    ) -> (
504        Vec<InitializedStateMachine<M>>,
505        Vec<
506            impl Future<Output = (InitializedStateMachine<M>, context::StateContext<T>)> + Send + 'a,
507        >,
508    )
509    where
510        M: IntoStateMachine + Send + Sync + 'static,
511        M::State: Send + Sync + 'static,
512        InitializedStateMachine<M>: ContextProcessor<T, M>,
513    {
514        let mut unchanged_machines = Vec::new();
515        let mut processing_futures = Vec::new();
516
517        for machine in std::mem::take(machines).into_iter() {
518            match machine.process_with_ctx(base_context) {
519                ContextProcessResult::Processing(future) => processing_futures.push(future),
520                ContextProcessResult::Unchanged(machine) => unchanged_machines.push(machine),
521            }
522        }
523
524        (unchanged_machines, processing_futures)
525    }
526
527    /// Given some new states and a start height, process the states from the given start height until the next height to process.
528    /// Then append the new states to the current state machines.
529    pub async fn process_and_add_new_states_from_height(
530        &mut self,
531        dbtx: Arc<Mutex<sqlx::Transaction<'static, sqlx::Postgres>>>,
532        new_round_machines: Vec<InitializedStateMachine<round::RoundStateMachine<T>>>,
533        new_kickoff_machines: Vec<InitializedStateMachine<kickoff::KickoffStateMachine<T>>>,
534        start_height: u32,
535    ) -> Result<(), eyre::Report> {
536        // create a temporary state manager that only includes the new states
537        let mut temporary_manager = self.clone_without_machines();
538        temporary_manager.round_machines = new_round_machines;
539        temporary_manager.kickoff_machines = new_kickoff_machines;
540
541        for block_height in start_height..temporary_manager.next_height_to_process {
542            let block = temporary_manager
543                .get_block(Some(&mut *dbtx.lock().await), block_height)
544                .await
545                .wrap_err_with(|| {
546                    format!(
547                        "Block at height {block_height} not found in process_and_add_new_states_from_height"
548                    )
549                })?;
550
551            let mut context = temporary_manager.new_context(dbtx.clone(), &block, block_height)?;
552            temporary_manager
553                .process_block_parallel(&mut context)
554                .await?;
555        }
556
557        // append new states to the current state manager
558        self.round_machines.extend(temporary_manager.round_machines);
559        self.kickoff_machines
560            .extend(temporary_manager.kickoff_machines);
561
562        Ok(())
563    }
564
565    /// It requires that the block cache is updated before calling this function.
566    /// Moves all state machines forward in parallel.
567    /// The state machines are updated until all of them stabilize in their state (ie.
568    /// the block does not generate any new events)
569    ///
570    /// # Errors
571    /// If the state machines do not stabilize after some iterations, we return an error.
572    pub async fn process_block_parallel(
573        &mut self,
574        context: &mut context::StateContext<T>,
575    ) -> Result<(), eyre::Report> {
576        let block_height = context.cache.block_height;
577
578        // Process all machines, for those unaffected collect them them, otherwise return
579        // a future that processes the new events.
580        let (mut final_kickoff_machines, mut kickoff_futures) =
581            Self::update_machines(&mut self.kickoff_machines, context);
582        let (mut final_round_machines, mut round_futures) =
583            Self::update_machines(&mut self.round_machines, context);
584
585        // Here we store number of iterations to detect if the machines do not stabilize after a while
586        // to prevent infinite loops. If a matcher is used, it is deleted, but a bug in implementation
587        // can technically cause infinite loops.
588        let mut iterations = 0;
589
590        // On each iteration, we'll update the changed machines until all machines
591        // stabilize in their state.
592        while !kickoff_futures.is_empty() || !round_futures.is_empty() {
593            // Execute all futures in parallel
594            let (kickoff_results, round_results) =
595                join(join_all(kickoff_futures), join_all(round_futures)).await;
596
597            // Unzip the results into updated machines and state contexts
598            let (mut changed_kickoff_machines, mut kickoff_contexts): (Vec<_>, Vec<_>) =
599                kickoff_results.into_iter().unzip();
600            let (mut changed_round_machines, mut round_contexts): (Vec<_>, Vec<_>) =
601                round_results.into_iter().unzip();
602
603            // Merge and handle errors
604            let mut all_errors = Vec::new();
605            for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
606                all_errors.extend(std::mem::take(&mut ctx.errors));
607            }
608
609            if !all_errors.is_empty() {
610                // Return first error or create a combined error
611                return Err(eyre::eyre!(
612                    "Multiple errors occurred during state processing: {:?}",
613                    all_errors
614                ));
615            }
616
617            // Append the newly generated state machines into the changed machines list
618            for ctx in kickoff_contexts.iter_mut().chain(round_contexts.iter_mut()) {
619                #[cfg(debug_assertions)]
620                for machine in &ctx.new_round_machines {
621                    if !machine.dirty {
622                        panic!(
623                            "Round machine not dirty despite having been newly created: {:?}",
624                            machine.state()
625                        );
626                    }
627                }
628                #[cfg(debug_assertions)]
629                for machine in &ctx.new_kickoff_machines {
630                    if !machine.dirty {
631                        panic!(
632                            "Kickoff machine not dirty despite having been newly created: {:?}",
633                            machine.state()
634                        );
635                    }
636                }
637                changed_round_machines.extend(std::mem::take(&mut ctx.new_round_machines));
638                changed_kickoff_machines.extend(std::mem::take(&mut ctx.new_kickoff_machines));
639            }
640
641            // If the machines do not stabilize after a while, we return an error
642            //
643            // Something like max(2 * num_kickoffs_per_round, number of utxos in a kickoff * 2) is possibly a safe value
644            if iterations > 100000 {
645                return Err(eyre::eyre!(
646                    r#"{}/{} kickoff and {}/{} round state machines did not stabilize after 100000 iterations, debug repr of changed machines:
647                        ---- Kickoff machines ----
648                        {:?}
649                        ---- Round machines ----
650                        {:?}
651                        "#,
652                    changed_kickoff_machines.len(),
653                    final_kickoff_machines.len() + changed_kickoff_machines.len(),
654                    changed_round_machines.len(),
655                    final_round_machines.len() + changed_round_machines.len(),
656                    changed_kickoff_machines
657                        .iter()
658                        .map(|m| m.state())
659                        .collect::<Vec<_>>(),
660                    changed_round_machines
661                        .iter()
662                        .map(|m| m.state())
663                        .collect::<Vec<_>>(),
664                ));
665            }
666
667            // Reprocess changed machines and commit these futures to be handled
668            // in the next round If they're empty, we'll exit the loop.
669            let (finalized_kickoff_machines, new_kickoff_futures) =
670                Self::update_machines(&mut changed_kickoff_machines, context);
671            let (finalized_round_machines, new_round_futures) =
672                Self::update_machines(&mut changed_round_machines, context);
673            final_kickoff_machines.extend(finalized_kickoff_machines);
674            final_round_machines.extend(finalized_round_machines);
675
676            // Update the futures to be processed
677            kickoff_futures = new_kickoff_futures;
678            round_futures = new_round_futures;
679            iterations += 1;
680        }
681
682        drop(kickoff_futures);
683        drop(round_futures);
684
685        // Set back the original machines
686        self.round_machines = final_round_machines;
687        self.kickoff_machines = final_kickoff_machines;
688
689        self.next_height_to_process = max(block_height + 1, self.next_height_to_process);
690
691        Ok(())
692    }
693}