clementine_core/task/
mod.rs

1use std::time::Duration;
2use tokio::sync::oneshot;
3use tokio::sync::oneshot::error::TryRecvError;
4use tokio::task::{self, JoinHandle};
5use tokio::time::sleep;
6use tonic::async_trait;
7
8use crate::errors::BridgeError;
9
10pub mod aggregator_metric_publisher;
11pub mod entity_metric_publisher;
12pub mod manager;
13pub mod payout_checker;
14pub mod status_monitor;
15
16/// The variant of the task, used for identifying the task in the status monitor
17/// Create a new enum variant for each task that you want to track in the status monitor
18/// BackgroundTaskManager will use TaskVariant to identify the tasks, to not start the same task twice.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub enum TaskVariant {
21    PayoutChecker,
22    StateManager,
23    FinalizedBlockFetcher,
24    TxSender,
25    BitcoinSyncer,
26    TaskStatusMonitor,
27    #[cfg(test)]
28    Counter,
29    #[cfg(test)]
30    Sleep,
31    /// Used to publish metrics to Prometheus periodically. This
32    MetricPublisher,
33}
34
35/// Task trait defining the core behavior for cancelable background tasks
36///
37/// This trait is implemented by any struct that needs to run as a background task.
38/// The run_once method contains the main logic of the task, and returns a bool
39/// indicating whether it did work (true) or needs to wait (false).
40#[async_trait]
41pub trait Task: Send + Sync + 'static {
42    /// The output of the fn run_once
43    type Output: Send + Sync + 'static + Sized;
44    /// The variant of the task
45    const VARIANT: TaskVariant;
46    /// Run the task once, returning whether work was done
47    ///
48    /// Returns:
49    /// - `Ok(true)` if the task did some work and is ready to run again immediately
50    /// - `Ok(false)` if the task did not do work and should wait before running again
51    /// - `Err(...)` if the task encountered an error
52    async fn run_once(&mut self) -> Result<Self::Output, BridgeError>;
53}
54
55/// A trait for objects that can be converted into a Task
56pub trait IntoTask {
57    type Task: Task;
58
59    /// Convert self into a Task
60    fn into_task(self) -> Self::Task;
61}
62
63impl<T: Task> IntoTask for T {
64    type Task = T;
65
66    fn into_task(self) -> Self::Task {
67        self
68    }
69}
70
71/// A task that adds a certain delay after the inner task has run
72/// to reduce polling frequency. When inner returns false, the delay is applied.
73#[derive(Debug)]
74pub struct WithDelay<T: Task>
75where
76    T::Output: Into<bool>,
77{
78    /// The task to poll
79    inner: T,
80    /// The interval between polls when no work is done
81    poll_delay: Duration,
82}
83
84impl<T: Task> WithDelay<T>
85where
86    T::Output: Into<bool>,
87{
88    /// Create a new delayed task
89    pub fn new(inner: T, poll_delay: Duration) -> Self {
90        Self { inner, poll_delay }
91    }
92}
93
94#[async_trait]
95impl<T: Task> Task for WithDelay<T>
96where
97    T::Output: Into<bool>,
98{
99    type Output = bool;
100    const VARIANT: TaskVariant = T::VARIANT;
101    async fn run_once(&mut self) -> Result<bool, BridgeError> {
102        // Run the inner task
103        let did_work = self.inner.run_once().await?.into();
104
105        // If the inner task did not do work, sleep for the poll delay
106        if !did_work {
107            sleep(self.poll_delay).await;
108        }
109
110        // Always return false since we've handled the waiting internally
111        Ok(false)
112    }
113}
114
115/// A task that can be canceled via a oneshot channel
116#[derive(Debug)]
117pub struct CancelableTask<T: Task> {
118    /// The task to run
119    inner: T,
120    /// Receiver for cancellation signal
121    cancel_rx: oneshot::Receiver<()>,
122}
123
124impl<T: Task> CancelableTask<T> {
125    /// Create a new cancelable task with a cancellation channel
126    pub fn new(inner: T, cancel_rx: oneshot::Receiver<()>) -> Self {
127        Self { inner, cancel_rx }
128    }
129}
130
131#[derive(Debug, Clone)]
132pub enum CancelableResult<T> {
133    Running(T),
134    Cancelled,
135}
136
137#[async_trait]
138impl<T: Task> Task for CancelableTask<T> {
139    type Output = CancelableResult<T::Output>;
140    const VARIANT: TaskVariant = T::VARIANT;
141
142    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
143        // Check if we've been canceled
144        if let Err(TryRecvError::Empty) = self.cancel_rx.try_recv() {
145            // Run the inner task
146            Ok(CancelableResult::Running(self.inner.run_once().await?))
147        } else {
148            Ok(CancelableResult::Cancelled)
149        }
150    }
151}
152
153#[derive(Debug)]
154pub struct CancelableLoop<T: Task + Sized> {
155    inner: CancelableTask<T>,
156}
157
158#[async_trait]
159impl<T: Task + Sized> Task for CancelableLoop<T> {
160    type Output = ();
161    const VARIANT: TaskVariant = T::VARIANT;
162
163    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
164        loop {
165            match self.inner.run_once().await {
166                Ok(CancelableResult::Running(_)) => {
167                    tokio::task::yield_now().await;
168                    continue;
169                }
170                Ok(CancelableResult::Cancelled) => return Ok(()),
171                Err(e) => return Err(e),
172            }
173        }
174    }
175}
176
177/// A trait for tasks that can handle errors, required for BufferedErrors.
178/// Tasks that want to use `into_buffered_errors()` must implement this trait
179/// to define how they recover from errors.
180#[async_trait]
181pub trait RecoverableTask: Task + Send + Sync {
182    /// Recover from an error by attempting to handle it.
183    /// If the error is handled, the task will continue running if error overflow limit is not reached.
184    async fn recover_from_error(&mut self, error: &BridgeError) -> Result<(), BridgeError>;
185}
186
187#[derive(Debug)]
188pub struct BufferedErrors<T: RecoverableTask + Sized>
189where
190    T::Output: Default,
191{
192    inner: T,
193    buffer: Vec<BridgeError>,
194    error_overflow_limit: usize,
195    handle_error_attempts: usize,
196    wait_between_recover_attempts: Duration,
197}
198
199impl<T: RecoverableTask + Sized> BufferedErrors<T>
200where
201    T::Output: Default,
202{
203    pub fn new(
204        inner: T,
205        error_overflow_limit: usize,
206        handle_error_attempts: usize,
207        wait_between_recover_attempts: Duration,
208    ) -> Self {
209        Self {
210            inner,
211            buffer: Vec::new(),
212            error_overflow_limit,
213            handle_error_attempts,
214            wait_between_recover_attempts,
215        }
216    }
217}
218
219#[async_trait]
220impl<T: RecoverableTask + Task + Sized + std::fmt::Debug> Task for BufferedErrors<T>
221where
222    T: Send,
223    T::Output: Default,
224{
225    type Output = T::Output;
226    const VARIANT: TaskVariant = T::VARIANT;
227
228    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
229        let result = self.inner.run_once().await;
230
231        match result {
232            Ok(output) => {
233                self.buffer.clear(); // clear buffer on first success
234                Ok(output)
235            }
236            Err(e) => {
237                tracing::error!(
238                    "Task {:?} error, attempting to recover: {e:?}",
239                    Self::VARIANT
240                );
241                // handle the error
242                for attempt in 1..=self.handle_error_attempts {
243                    let result = self.inner.recover_from_error(&e).await;
244                    match result {
245                        Ok(()) => break,
246                        Err(e) => {
247                            tracing::error!(
248                                "Task {:?} error, failed to recover (attempt {attempt}): {e:?}",
249                                Self::VARIANT,
250                            );
251                            if attempt == self.handle_error_attempts {
252                                // this will only close the task thread
253                                return Err(eyre::eyre!(
254                                    "Failed to recover from task {:?} error after {attempt} attempts, aborting...",
255                                    Self::VARIANT
256                                ).into());
257                            }
258                            // wait for the configured duration (self.wait_between_recover_attempts) before trying again
259                            tokio::time::sleep(self.wait_between_recover_attempts).await;
260                        }
261                    }
262                }
263                self.buffer.push(e);
264                if self.buffer.len() >= self.error_overflow_limit {
265                    let mut base_error: eyre::Report =
266                        self.buffer.pop().expect("just inserted above").into();
267
268                    for error in std::mem::take(&mut self.buffer) {
269                        base_error = base_error.wrap_err(error);
270                    }
271
272                    base_error = base_error.wrap_err(format!(
273                        "Exiting due to {} consecutive errors, the following chain is the list of errors.",
274                        self.error_overflow_limit
275                    ));
276
277                    Err(base_error.into())
278                } else {
279                    Ok(Default::default())
280                }
281            }
282        }
283    }
284}
285
286#[derive(Debug)]
287pub struct Map<T: Task + Sized, F: Fn(T::Output) -> T::Output + Send + Sync + 'static> {
288    inner: T,
289    map: F,
290}
291
292#[async_trait]
293impl<T: Task + Sized, F: Fn(T::Output) -> T::Output + Send + Sync + 'static> Task for Map<T, F> {
294    type Output = T::Output;
295    const VARIANT: TaskVariant = T::VARIANT;
296
297    #[track_caller]
298    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
299        let result = self.inner.run_once().await;
300        let output = match result {
301            Ok(output) => (self.map)(output),
302            Err(e) => return Err(e),
303        };
304        Ok(output)
305    }
306}
307
308/// A task that ignores errors from the inner task and returns a default value.
309#[derive(Debug)]
310pub struct IgnoreError<T: Task + Sized>
311where
312    T::Output: Default,
313{
314    inner: T,
315}
316
317#[async_trait]
318impl<T: Task + Sized + std::fmt::Debug> Task for IgnoreError<T>
319where
320    T::Output: Default,
321{
322    type Output = T::Output;
323    const VARIANT: TaskVariant = T::VARIANT;
324
325    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
326        Ok(self
327            .inner
328            .run_once()
329            .await
330            .inspect_err(|e| {
331                tracing::error!(task=?self.inner, "Task error, suppressing due to errors ignored: {e:?}");
332            })
333            .ok()
334            .unwrap_or_default())
335    }
336}
337
338pub trait TaskExt: Task + Sized {
339    /// Skips running the task after cancellation using the sender.
340    fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>);
341
342    /// Runs the task in an infinite loop until cancelled using the sender.
343    fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>);
344
345    /// Adds the given delay after a run of the task when the task returns false.
346    fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
347    where
348        Self::Output: Into<bool>;
349
350    /// Spawns a [`tokio::task`] that runs the task once in the background.
351    fn into_bg(self) -> JoinHandle<Result<Self::Output, BridgeError>>;
352
353    /// Buffers consecutive errors until the task succeeds, emits all errors when there are
354    /// more than `error_overflow_limit` consecutive errors.
355    /// If the task fails, error will be tried to be handled up to `handle_error_attempts` times.
356    /// After each attempt, the task will wait for `wait_between_recover_attempts` before trying again.
357    fn into_buffered_errors(
358        self,
359        error_overflow_limit: usize,
360        handle_error_attempts: usize,
361        wait_between_recover_attempts: Duration,
362    ) -> BufferedErrors<Self>
363    where
364        Self: RecoverableTask,
365        Self::Output: Default;
366
367    /// Maps the task's `Ok()` output using the given function.
368    fn map<F: Fn(Self::Output) -> Self::Output + Send + Sync + 'static>(
369        self,
370        map: F,
371    ) -> Map<Self, F>;
372
373    /// Ignores errors from the task.
374    fn ignore_error(self) -> IgnoreError<Self>
375    where
376        Self::Output: Default;
377}
378
379impl<T: Task + Sized> TaskExt for T {
380    fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>) {
381        let (cancel_tx, cancel_rx) = oneshot::channel();
382        (CancelableTask::new(self, cancel_rx), cancel_tx)
383    }
384
385    fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>) {
386        let (task, cancel_tx) = self.cancelable();
387        (CancelableLoop { inner: task }, cancel_tx)
388    }
389
390    fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
391    where
392        Self::Output: Into<bool>,
393    {
394        WithDelay::new(self, poll_delay)
395    }
396
397    fn into_bg(mut self) -> JoinHandle<Result<Self::Output, BridgeError>> {
398        tokio::spawn(async move {
399            tracing::debug!(
400                "Running task {:?} with ID {:?}",
401                Self::VARIANT,
402                task::try_id()
403            );
404            self.run_once().await
405        })
406    }
407
408    fn into_buffered_errors(
409        self,
410        error_overflow_limit: usize,
411        handle_error_attempts: usize,
412        wait_between_recover_attempts: Duration,
413    ) -> BufferedErrors<Self>
414    where
415        Self: RecoverableTask,
416        Self::Output: Default,
417    {
418        BufferedErrors::new(
419            self,
420            error_overflow_limit,
421            handle_error_attempts,
422            wait_between_recover_attempts,
423        )
424    }
425
426    fn map<F: Fn(Self::Output) -> Self::Output + Send + Sync + 'static>(
427        self,
428        map: F,
429    ) -> Map<Self, F> {
430        Map { inner: self, map }
431    }
432
433    fn ignore_error(self) -> IgnoreError<Self>
434    where
435        Self::Output: Default,
436    {
437        IgnoreError { inner: self }
438    }
439}
440
441#[cfg(test)]
442mod tests;