1use crate::{
2 bitcoin_syncer::{BlockHandler, FinalizedBlockFetcherTask},
3 database::{Database, DatabaseTransaction},
4 task::{BufferedErrors, IntoTask, RecoverableTask, TaskVariant, WithDelay},
5};
6use eyre::{Context as _, OptionExt};
7use pgmq::{Message, PGMQueueExt};
8use std::{sync::Arc, time::Duration};
9use tokio::sync::Mutex;
10use tonic::async_trait;
11
12use crate::{
13 config::protocol::ProtocolParamset,
14 states::SystemEvent,
15 task::{Task, TaskExt},
16};
17use clementine_errors::BridgeError;
18
19use super::{context::Owner, StateManager};
20
21const POLL_DELAY: Duration = if cfg!(test) {
22 Duration::from_millis(250)
23} else {
24 Duration::from_secs(30)
25};
26
27#[derive(Debug, Clone)]
29pub struct QueueBlockHandler {
30 queue: PGMQueueExt,
31 queue_name: String,
32}
33
34#[async_trait]
35impl BlockHandler for QueueBlockHandler {
36 async fn handle_new_block(
39 &mut self,
40 dbtx: DatabaseTransaction<'_>,
41 _block_id: u32,
42 block: bitcoin::Block,
43 height: u32,
44 ) -> Result<(), BridgeError> {
45 let event = SystemEvent::NewFinalizedBlock { block, height };
46
47 self.queue
48 .send_with_cxn(&self.queue_name, &event, &mut **dbtx)
49 .await
50 .wrap_err("Error sending new block event to queue")?;
51 Ok(())
52 }
53}
54
55#[derive(Debug)]
57pub struct BlockFetcherTask<T: Owner + std::fmt::Debug + 'static> {
58 _phantom: std::marker::PhantomData<T>,
60}
61
62impl<T: Owner + std::fmt::Debug + 'static> BlockFetcherTask<T> {
63 pub async fn new_finalized_block_fetcher_task(
65 db: Database,
66 paramset: &'static ProtocolParamset,
67 ) -> Result<FinalizedBlockFetcherTask<QueueBlockHandler>, BridgeError> {
68 let queue = PGMQueueExt::new_with_pool(db.get_pool()).await;
69 let queue_name = StateManager::<T>::queue_name();
70
71 let handler = QueueBlockHandler {
72 queue,
73 queue_name: queue_name.clone(),
74 };
75
76 let next_height = db
78 .get_next_finalized_block_height_for_consumer(
79 None,
80 T::FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION,
81 paramset,
82 )
83 .await?;
84
85 tracing::info!(
86 "Creating block fetcher task for owner type {} starting from height {}",
87 T::ENTITY_NAME,
88 next_height
89 );
90
91 Ok(crate::bitcoin_syncer::FinalizedBlockFetcherTask::new(
92 db,
93 T::FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION.to_string(),
94 paramset,
95 next_height,
96 handler,
97 ))
98 }
99}
100
101#[derive(Debug)]
103pub struct MessageConsumerTask<T: Owner + std::fmt::Debug + 'static> {
104 db: Database,
105 inner: StateManager<T>,
106 queue_name: String,
108}
109
110#[async_trait]
111impl<T: Owner + std::fmt::Debug + 'static> Task for MessageConsumerTask<T> {
112 type Output = bool;
113 const VARIANT: TaskVariant = TaskVariant::StateManager;
114
115 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
116 let new_event_received = async {
117 let mut dbtx = self.db.begin_transaction().await?;
118
119 let Some(Message {
121 msg_id, message, ..
122 }): Option<Message<SystemEvent>> = self
123 .inner
124 .queue
125 .read_with_cxn(&self.queue_name, 0, &mut *dbtx)
128 .await
129 .wrap_err("Reading event from queue")?
130 else {
131 dbtx.commit().await?;
132 return Ok::<_, BridgeError>(false);
133 };
134
135 let arc_dbtx = Arc::new(Mutex::new(dbtx));
136
137 self.inner.handle_event(message, arc_dbtx.clone()).await?;
138
139 let mut dbtx = Arc::into_inner(arc_dbtx)
140 .ok_or_eyre("Expected single reference to DB tx when committing")?
141 .into_inner();
142
143 self.inner
145 .queue
146 .archive_with_cxn(&self.queue_name, msg_id, &mut *dbtx)
147 .await
148 .wrap_err("Deleting event from queue")?;
149
150 dbtx.commit().await?;
151 Ok(true)
152 }
153 .await?;
154
155 Ok(new_event_received)
156 }
157}
158
159#[async_trait]
160impl<T: Owner + std::fmt::Debug + 'static> RecoverableTask for MessageConsumerTask<T> {
161 async fn recover_from_error(&mut self, _error: &BridgeError) -> Result<(), BridgeError> {
162 self.inner.reload_state_manager_from_db().await
164 }
165}
166
167impl<T: Owner + std::fmt::Debug + 'static> IntoTask for StateManager<T> {
168 type Task = WithDelay<BufferedErrors<MessageConsumerTask<T>>>;
169
170 fn into_task(self) -> Self::Task {
172 MessageConsumerTask {
173 db: self.db.clone(),
174 inner: self,
175 queue_name: StateManager::<T>::queue_name(),
176 }
177 .into_buffered_errors(10, 3, Duration::from_secs(10))
178 .with_delay(POLL_DELAY)
179 }
180}
181
182impl<T: Owner + std::fmt::Debug + 'static> StateManager<T> {
183 pub async fn block_fetcher_task(
184 &self,
185 ) -> Result<WithDelay<impl Task<Output = bool> + std::fmt::Debug>, BridgeError> {
186 Ok(BlockFetcherTask::<T>::new_finalized_block_fetcher_task(
187 self.db.clone(),
188 self.config.protocol_paramset,
189 )
190 .await?
191 .into_buffered_errors(20, 3, Duration::from_secs(10))
192 .with_delay(POLL_DELAY))
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use std::collections::BTreeMap;
199
200 use tokio::{sync::oneshot, task::JoinHandle, time::timeout};
201 use tonic::async_trait;
202
203 use crate::{
204 builder::transaction::{ContractContext, TxHandler},
205 config::BridgeConfig,
206 database::DatabaseTransaction,
207 extended_bitcoin_rpc::ExtendedBitcoinRpc,
208 states::{context::DutyResult, Duty},
209 test::common::{create_regtest_rpc, create_test_config_with_thread_name},
210 utils::NamedEntity,
211 };
212 use clementine_primitives::TransactionType;
213
214 use super::*;
215
216 #[derive(Clone, Debug)]
217 struct MockHandler;
218
219 impl NamedEntity for MockHandler {
220 const ENTITY_NAME: &'static str = "MockHandler";
221 const LCP_SYNCER_CONSUMER_ID: &'static str = "mock_lcp_syncer";
222 const FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION: &'static str =
223 "mock_finalized_block_automation";
224 }
225
226 #[async_trait]
227 impl Owner for MockHandler {
228 async fn handle_duty(
229 &self,
230 _dbtx: DatabaseTransaction<'_>,
231 _: Duty,
232 ) -> Result<DutyResult, BridgeError> {
233 Ok(DutyResult::Handled)
234 }
235
236 async fn create_txhandlers(
237 &self,
238 _dbtx: DatabaseTransaction<'_>,
239 _: TransactionType,
240 _: ContractContext,
241 ) -> Result<BTreeMap<TransactionType, TxHandler>, BridgeError> {
242 Ok(BTreeMap::new())
243 }
244 }
245
246 async fn create_state_manager(
247 config: &mut BridgeConfig,
248 ) -> (JoinHandle<Result<(), BridgeError>>, oneshot::Sender<()>) {
249 let db = Database::new(config).await.unwrap();
250
251 let rpc = ExtendedBitcoinRpc::connect(
252 config.bitcoin_rpc_url.clone(),
253 config.bitcoin_rpc_user.clone(),
254 config.bitcoin_rpc_password.clone(),
255 None,
256 )
257 .await
258 .expect("Failed to connect to Bitcoin RPC");
259
260 let state_manager = StateManager::new(db, MockHandler, rpc, config.clone())
261 .await
262 .unwrap();
263 let (t, shutdown) = state_manager.into_task().cancelable_loop();
264 (t.into_bg(), shutdown)
265 }
266
267 #[tokio::test]
268 async fn test_run_state_manager() {
269 let mut config = create_test_config_with_thread_name().await;
270 let cleanup = create_regtest_rpc(&mut config).await;
271 cleanup
272 .rpc()
273 .mine_blocks(config.protocol_paramset.start_height as u64)
274 .await
275 .unwrap();
276 let (handle, shutdown) = create_state_manager(&mut config).await;
277
278 drop(shutdown);
279
280 timeout(Duration::from_secs(1), handle)
281 .await
282 .expect("state manager should exit after shutdown signal (timed out after 1s)")
283 .expect("state manager should shutdown gracefully (thread panic should not happen)")
284 .expect("state manager should shutdown gracefully");
285 }
286
287 #[tokio::test]
288 async fn test_state_mgr_does_not_shutdown() {
289 let mut config = create_test_config_with_thread_name().await;
290 let cleanup = create_regtest_rpc(&mut config).await;
291 cleanup
292 .rpc()
293 .mine_blocks(config.protocol_paramset.start_height as u64)
294 .await
295 .unwrap();
296 let (handle, shutdown) = create_state_manager(&mut config).await;
297
298 timeout(Duration::from_secs(1), handle).await.expect_err(
299 "state manager should not shutdown while shutdown handle is alive (timed out after 1s)",
300 );
301
302 drop(shutdown);
303 }
304}