clementine_core/states/
task.rs

1use crate::{
2    bitcoin_syncer::{BlockHandler, FinalizedBlockFetcherTask},
3    database::{Database, DatabaseTransaction},
4    task::{BufferedErrors, IntoTask, RecoverableTask, TaskVariant, WithDelay},
5};
6use eyre::{Context as _, OptionExt};
7use pgmq::{Message, PGMQueueExt};
8use std::{sync::Arc, time::Duration};
9use tokio::sync::Mutex;
10use tonic::async_trait;
11
12use crate::{
13    config::protocol::ProtocolParamset,
14    states::SystemEvent,
15    task::{Task, TaskExt},
16};
17use clementine_errors::BridgeError;
18
19use super::{context::Owner, StateManager};
20
21const POLL_DELAY: Duration = if cfg!(test) {
22    Duration::from_millis(250)
23} else {
24    Duration::from_secs(30)
25};
26
27/// Block handler that sends events to a PostgreSQL message queue
28#[derive(Debug, Clone)]
29pub struct QueueBlockHandler {
30    queue: PGMQueueExt,
31    queue_name: String,
32}
33
34#[async_trait]
35impl BlockHandler for QueueBlockHandler {
36    /// Handles a new block by sending a new block event to the queue.
37    /// State manager will process the block after reading the event from the queue.
38    async fn handle_new_block(
39        &mut self,
40        dbtx: DatabaseTransaction<'_>,
41        _block_id: u32,
42        block: bitcoin::Block,
43        height: u32,
44    ) -> Result<(), BridgeError> {
45        let event = SystemEvent::NewFinalizedBlock { block, height };
46
47        self.queue
48            .send_with_cxn(&self.queue_name, &event, &mut **dbtx)
49            .await
50            .wrap_err("Error sending new block event to queue")?;
51        Ok(())
52    }
53}
54
55/// A task that fetches new finalized blocks from Bitcoin and adds them to the state management queue
56#[derive(Debug)]
57pub struct BlockFetcherTask<T: Owner + std::fmt::Debug + 'static> {
58    /// Owner type marker
59    _phantom: std::marker::PhantomData<T>,
60}
61
62impl<T: Owner + std::fmt::Debug + 'static> BlockFetcherTask<T> {
63    /// Creates a new finalized block fetcher task that sends new finalized blocks to the message queue.
64    pub async fn new_finalized_block_fetcher_task(
65        db: Database,
66        paramset: &'static ProtocolParamset,
67    ) -> Result<FinalizedBlockFetcherTask<QueueBlockHandler>, BridgeError> {
68        let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
69        let queue_name = StateManager::<T>::queue_name();
70
71        let handler = QueueBlockHandler {
72            queue,
73            queue_name: queue_name.clone(),
74        };
75
76        // get the next finalized block height to start from
77        let next_height = db
78            .get_next_finalized_block_height_for_consumer(
79                None,
80                T::FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION,
81                paramset,
82            )
83            .await?;
84
85        tracing::info!(
86            "Creating block fetcher task for owner type {} starting from height {}",
87            T::ENTITY_NAME,
88            next_height
89        );
90
91        Ok(crate::bitcoin_syncer::FinalizedBlockFetcherTask::new(
92            db,
93            T::FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION.to_string(),
94            paramset,
95            next_height,
96            handler,
97        ))
98    }
99}
100
101/// A task that reads new events from the message queue and processes them.
102#[derive(Debug)]
103pub struct MessageConsumerTask<T: Owner + std::fmt::Debug + 'static> {
104    db: Database,
105    inner: StateManager<T>,
106    /// Queue name for this owner type (cached)
107    queue_name: String,
108}
109
110#[async_trait]
111impl<T: Owner + std::fmt::Debug + 'static> Task for MessageConsumerTask<T> {
112    type Output = bool;
113    const VARIANT: TaskVariant = TaskVariant::StateManager;
114
115    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
116        let new_event_received = async {
117            let mut dbtx = self.db.begin_transaction().await?;
118
119            // Poll new event
120            let Some(Message {
121                msg_id, message, ..
122            }): Option<Message<SystemEvent>> = self
123                .inner
124                .queue
125                // 2nd param of read_with_cxn is the visibility timeout, set to 0 as we only have 1 consumer of the queue, which is the state machine
126                // visibility timeout is the time after which the message is visible again to other consumers
127                .read_with_cxn(&self.queue_name, 0, &mut *dbtx)
128                .await
129                .wrap_err("Reading event from queue")?
130            else {
131                dbtx.commit().await?;
132                return Ok::<_, BridgeError>(false);
133            };
134
135            let arc_dbtx = Arc::new(Mutex::new(dbtx));
136
137            self.inner.handle_event(message, arc_dbtx.clone()).await?;
138
139            let mut dbtx = Arc::into_inner(arc_dbtx)
140                .ok_or_eyre("Expected single reference to DB tx when committing")?
141                .into_inner();
142
143            // Delete event from queue
144            self.inner
145                .queue
146                .archive_with_cxn(&self.queue_name, msg_id, &mut *dbtx)
147                .await
148                .wrap_err("Deleting event from queue")?;
149
150            dbtx.commit().await?;
151            Ok(true)
152        }
153        .await?;
154
155        Ok(new_event_received)
156    }
157}
158
159#[async_trait]
160impl<T: Owner + std::fmt::Debug + 'static> RecoverableTask for MessageConsumerTask<T> {
161    async fn recover_from_error(&mut self, _error: &BridgeError) -> Result<(), BridgeError> {
162        // in case of any error, reload the state machines from the database
163        self.inner.reload_state_manager_from_db().await
164    }
165}
166
167impl<T: Owner + std::fmt::Debug + 'static> IntoTask for StateManager<T> {
168    type Task = WithDelay<BufferedErrors<MessageConsumerTask<T>>>;
169
170    /// Converts the StateManager into the consumer task with a polling delay.
171    fn into_task(self) -> Self::Task {
172        MessageConsumerTask {
173            db: self.db.clone(),
174            inner: self,
175            queue_name: StateManager::<T>::queue_name(),
176        }
177        .into_buffered_errors(10, 3, Duration::from_secs(10))
178        .with_delay(POLL_DELAY)
179    }
180}
181
182impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
183    pub async fn block_fetcher_task(
184        &self,
185    ) -> Result<WithDelay<impl Task<Output = bool> + std::fmt::Debug>, BridgeError> {
186        Ok(BlockFetcherTask::<T>::new_finalized_block_fetcher_task(
187            self.db.clone(),
188            self.config.protocol_paramset,
189        )
190        .await?
191        .into_buffered_errors(20, 3, Duration::from_secs(10))
192        .with_delay(POLL_DELAY))
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use std::collections::BTreeMap;
199
200    use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
201    use tonic::async_trait;
202
203    use crate::{
204        builder::transaction::{ContractContext, TxHandler},
205        config::BridgeConfig,
206        database::DatabaseTransaction,
207        extended_bitcoin_rpc::ExtendedBitcoinRpc,
208        states::{context::DutyResult, Duty},
209        test::common::{create_regtest_rpc, create_test_config_with_thread_name},
210        utils::NamedEntity,
211    };
212    use clementine_primitives::TransactionType;
213
214    use super::*;
215
216    #[derive(Clone, Debug)]
217    struct MockHandler;
218
219    impl NamedEntity for MockHandler {
220        const ENTITY_NAME: &'static str = "MockHandler";
221        const LCP_SYNCER_CONSUMER_ID: &'static str = "mock_lcp_syncer";
222        const FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION: &'static str =
223            "mock_finalized_block_automation";
224    }
225
226    #[async_trait]
227    impl Owner for MockHandler {
228        async fn handle_duty(
229            &self,
230            _dbtx: DatabaseTransaction<'_>,
231            _: Duty,
232        ) -> Result<DutyResult, BridgeError> {
233            Ok(DutyResult::Handled)
234        }
235
236        async fn create_txhandlers(
237            &self,
238            _dbtx: DatabaseTransaction<'_>,
239            _: TransactionType,
240            _: ContractContext,
241        ) -> Result<BTreeMap<TransactionType, TxHandler>, BridgeError> {
242            Ok(BTreeMap::new())
243        }
244    }
245
246    async fn create_state_manager(
247        config: &mut BridgeConfig,
248    ) -> (JoinHandle<Result<(), BridgeError>>, oneshot::Sender<()>) {
249        let db = Database::new(config).await.unwrap();
250
251        let rpc = ExtendedBitcoinRpc::connect(
252            config.bitcoin_rpc_url.clone(),
253            config.bitcoin_rpc_user.clone(),
254            config.bitcoin_rpc_password.clone(),
255            None,
256        )
257        .await
258        .expect("Failed to connect to Bitcoin RPC");
259
260        let state_manager = StateManager::new(db, MockHandler, rpc, config.clone())
261            .await
262            .unwrap();
263        let (t, shutdown) = state_manager.into_task().cancelable_loop();
264        (t.into_bg(), shutdown)
265    }
266
267    #[tokio::test]
268    async fn test_run_state_manager() {
269        let mut config = create_test_config_with_thread_name().await;
270        let cleanup = create_regtest_rpc(&mut config).await;
271        cleanup
272            .rpc()
273            .mine_blocks(config.protocol_paramset.start_height as u64)
274            .await
275            .unwrap();
276        let (handle, shutdown) = create_state_manager(&mut config).await;
277
278        drop(shutdown);
279
280        timeout(Duration::from_secs(1), handle)
281            .await
282            .expect("state manager should exit after shutdown signal (timed out after 1s)")
283            .expect("state manager should shutdown gracefully (thread panic should not happen)")
284            .expect("state manager should shutdown gracefully");
285    }
286
287    #[tokio::test]
288    async fn test_state_mgr_does_not_shutdown() {
289        let mut config = create_test_config_with_thread_name().await;
290        let cleanup = create_regtest_rpc(&mut config).await;
291        cleanup
292            .rpc()
293            .mine_blocks(config.protocol_paramset.start_height as u64)
294            .await
295            .unwrap();
296        let (handle, shutdown) = create_state_manager(&mut config).await;
297
298        timeout(Duration::from_secs(1), handle).await.expect_err(
299            "state manager should not shutdown while shutdown handle is alive (timed out after 1s)",
300        );
301
302        drop(shutdown);
303    }
304}