1use std::time::Duration;
2use tokio::sync::oneshot;
3use tokio::sync::oneshot::error::TryRecvError;
4use tokio::task::{self, JoinHandle};
5use tokio::time::sleep;
6use tonic::async_trait;
7
8use clementine_errors::BridgeError;
9
10pub mod aggregator_metric_publisher;
11pub mod entity_metric_publisher;
12pub mod lcp_syncer;
13pub mod manager;
14pub mod payout_checker;
15pub mod status_monitor;
16#[cfg(feature = "automation")]
17pub mod tx_sender;
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum TaskVariant {
24 PayoutChecker,
25 StateManager,
26 FinalizedBlockFetcher,
27 LcpSyncer,
28 TxSender,
29 BitcoinSyncer,
30 TaskStatusMonitor,
31 #[cfg(test)]
32 Counter,
33 #[cfg(test)]
34 Sleep,
35 MetricPublisher,
37}
38
39#[async_trait]
45pub trait Task: Send + Sync + 'static {
46 type Output: Send + Sync + 'static + Sized;
48 const VARIANT: TaskVariant;
50 async fn run_once(&mut self) -> Result<Self::Output, BridgeError>;
57}
58
59pub trait IntoTask {
61 type Task: Task;
62
63 fn into_task(self) -> Self::Task;
65}
66
67impl<T: Task> IntoTask for T {
68 type Task = T;
69
70 fn into_task(self) -> Self::Task {
71 self
72 }
73}
74
75#[derive(Debug)]
78pub struct WithDelay<T: Task>
79where
80 T::Output: Into<bool>,
81{
82 inner: T,
84 poll_delay: Duration,
86}
87
88impl<T: Task> WithDelay<T>
89where
90 T::Output: Into<bool>,
91{
92 pub fn new(inner: T, poll_delay: Duration) -> Self {
94 Self { inner, poll_delay }
95 }
96}
97
98#[async_trait]
99impl<T: Task> Task for WithDelay<T>
100where
101 T::Output: Into<bool>,
102{
103 type Output = bool;
104 const VARIANT: TaskVariant = T::VARIANT;
105 async fn run_once(&mut self) -> Result<bool, BridgeError> {
106 let did_work = self.inner.run_once().await?.into();
108
109 if !did_work {
111 sleep(self.poll_delay).await;
112 }
113
114 Ok(false)
116 }
117}
118
119#[derive(Debug)]
121pub struct CancelableTask<T: Task> {
122 inner: T,
124 cancel_rx: oneshot::Receiver<()>,
126}
127
128impl<T: Task> CancelableTask<T> {
129 pub fn new(inner: T, cancel_rx: oneshot::Receiver<()>) -> Self {
131 Self { inner, cancel_rx }
132 }
133}
134
135#[derive(Debug, Clone)]
136pub enum CancelableResult<T> {
137 Running(T),
138 Cancelled,
139}
140
141#[async_trait]
142impl<T: Task> Task for CancelableTask<T> {
143 type Output = CancelableResult<T::Output>;
144 const VARIANT: TaskVariant = T::VARIANT;
145
146 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
147 if let Err(TryRecvError::Empty) = self.cancel_rx.try_recv() {
149 Ok(CancelableResult::Running(self.inner.run_once().await?))
151 } else {
152 Ok(CancelableResult::Cancelled)
153 }
154 }
155}
156
157#[derive(Debug)]
158pub struct CancelableLoop<T: Task + Sized> {
159 inner: CancelableTask<T>,
160}
161
162#[async_trait]
163impl<T: Task + Sized> Task for CancelableLoop<T> {
164 type Output = ();
165 const VARIANT: TaskVariant = T::VARIANT;
166
167 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
168 loop {
169 match self.inner.run_once().await {
170 Ok(CancelableResult::Running(_)) => {
171 tokio::task::yield_now().await;
172 continue;
173 }
174 Ok(CancelableResult::Cancelled) => return Ok(()),
175 Err(e) => return Err(e),
176 }
177 }
178 }
179}
180
181#[async_trait]
185pub trait RecoverableTask: Task + Send + Sync {
186 async fn recover_from_error(&mut self, error: &BridgeError) -> Result<(), BridgeError>;
189}
190
191#[derive(Debug)]
192pub struct BufferedErrors<T: RecoverableTask + Sized>
193where
194 T::Output: Default,
195{
196 inner: T,
197 buffer: Vec<BridgeError>,
198 error_overflow_limit: usize,
199 handle_error_attempts: usize,
200 wait_between_recover_attempts: Duration,
201}
202
203impl<T: RecoverableTask + Sized> BufferedErrors<T>
204where
205 T::Output: Default,
206{
207 pub fn new(
208 inner: T,
209 error_overflow_limit: usize,
210 handle_error_attempts: usize,
211 wait_between_recover_attempts: Duration,
212 ) -> Self {
213 Self {
214 inner,
215 buffer: Vec::new(),
216 error_overflow_limit,
217 handle_error_attempts,
218 wait_between_recover_attempts,
219 }
220 }
221}
222
223#[async_trait]
224impl<T: RecoverableTask + Task + Sized + std::fmt::Debug> Task for BufferedErrors<T>
225where
226 T: Send,
227 T::Output: Default,
228{
229 type Output = T::Output;
230 const VARIANT: TaskVariant = T::VARIANT;
231
232 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
233 let result = self.inner.run_once().await;
234
235 match result {
236 Ok(output) => {
237 self.buffer.clear(); Ok(output)
239 }
240 Err(e) => {
241 tracing::error!(
242 "Task {:?} error, attempting to recover: {e:?}",
243 Self::VARIANT
244 );
245 for attempt in 1..=self.handle_error_attempts {
247 let result = self.inner.recover_from_error(&e).await;
248 match result {
249 Ok(()) => break,
250 Err(e) => {
251 tracing::error!(
252 "Task {:?} error, failed to recover (attempt {attempt}): {e:?}",
253 Self::VARIANT,
254 );
255 if attempt == self.handle_error_attempts {
256 return Err(eyre::eyre!(
258 "Failed to recover from task {:?} error after {attempt} attempts, aborting...",
259 Self::VARIANT
260 ).into());
261 }
262 tokio::time::sleep(self.wait_between_recover_attempts).await;
264 }
265 }
266 }
267 self.buffer.push(e);
268 if self.buffer.len() >= self.error_overflow_limit {
269 let mut base_error: eyre::Report =
270 self.buffer.pop().expect("just inserted above").into();
271
272 for error in std::mem::take(&mut self.buffer) {
273 base_error = base_error.wrap_err(error);
274 }
275
276 base_error = base_error.wrap_err(format!(
277 "Exiting due to {} consecutive errors, the following chain is the list of errors.",
278 self.error_overflow_limit
279 ));
280
281 Err(base_error.into())
282 } else {
283 Ok(Default::default())
284 }
285 }
286 }
287 }
288}
289
290#[derive(Debug)]
292pub struct IgnoreError<T: Task + Sized>
293where
294 T::Output: Default,
295{
296 inner: T,
297}
298
299#[async_trait]
300impl<T: Task + Sized + std::fmt::Debug> Task for IgnoreError<T>
301where
302 T::Output: Default,
303{
304 type Output = T::Output;
305 const VARIANT: TaskVariant = T::VARIANT;
306
307 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
308 Ok(self
309 .inner
310 .run_once()
311 .await
312 .inspect_err(|e| {
313 tracing::error!(task=?self.inner, "Task error, suppressing due to errors ignored: {e:?}");
314 })
315 .ok()
316 .unwrap_or_default())
317 }
318}
319
320pub trait TaskExt: Task + Sized {
321 fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>);
323
324 fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>);
326
327 fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
329 where
330 Self::Output: Into<bool>;
331
332 fn into_bg(self) -> JoinHandle<Result<Self::Output, BridgeError>>;
334
335 fn into_buffered_errors(
340 self,
341 error_overflow_limit: usize,
342 handle_error_attempts: usize,
343 wait_between_recover_attempts: Duration,
344 ) -> BufferedErrors<Self>
345 where
346 Self: RecoverableTask,
347 Self::Output: Default;
348
349 fn ignore_error(self) -> IgnoreError<Self>
351 where
352 Self::Output: Default;
353}
354
355impl<T: Task + Sized> TaskExt for T {
356 fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>) {
357 let (cancel_tx, cancel_rx) = oneshot::channel();
358 (CancelableTask::new(self, cancel_rx), cancel_tx)
359 }
360
361 fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>) {
362 let (task, cancel_tx) = self.cancelable();
363 (CancelableLoop { inner: task }, cancel_tx)
364 }
365
366 fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
367 where
368 Self::Output: Into<bool>,
369 {
370 WithDelay::new(self, poll_delay)
371 }
372
373 fn into_bg(mut self) -> JoinHandle<Result<Self::Output, BridgeError>> {
374 tokio::spawn(async move {
375 tracing::debug!(
376 "Running task {:?} with ID {:?}",
377 Self::VARIANT,
378 task::try_id()
379 );
380 self.run_once().await
381 })
382 }
383
384 fn into_buffered_errors(
385 self,
386 error_overflow_limit: usize,
387 handle_error_attempts: usize,
388 wait_between_recover_attempts: Duration,
389 ) -> BufferedErrors<Self>
390 where
391 Self: RecoverableTask,
392 Self::Output: Default,
393 {
394 BufferedErrors::new(
395 self,
396 error_overflow_limit,
397 handle_error_attempts,
398 wait_between_recover_attempts,
399 )
400 }
401
402 fn ignore_error(self) -> IgnoreError<Self>
403 where
404 Self::Output: Default,
405 {
406 IgnoreError { inner: self }
407 }
408}
409
410#[cfg(test)]
411mod tests;