clementine_core/task/
mod.rs

1use std::time::Duration;
2use tokio::sync::oneshot;
3use tokio::sync::oneshot::error::TryRecvError;
4use tokio::task::{self, JoinHandle};
5use tokio::time::sleep;
6use tonic::async_trait;
7
8use clementine_errors::BridgeError;
9
10pub mod aggregator_metric_publisher;
11pub mod entity_metric_publisher;
12pub mod manager;
13pub mod payout_checker;
14pub mod status_monitor;
15pub mod tx_sender;
16
17/// The variant of the task, used for identifying the task in the status monitor
18/// Create a new enum variant for each task that you want to track in the status monitor
19/// BackgroundTaskManager will use TaskVariant to identify the tasks, to not start the same task twice.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub enum TaskVariant {
22    PayoutChecker,
23    StateManager,
24    FinalizedBlockFetcher,
25    TxSender,
26    BitcoinSyncer,
27    TaskStatusMonitor,
28    #[cfg(test)]
29    Counter,
30    #[cfg(test)]
31    Sleep,
32    /// Used to publish metrics to Prometheus periodically. This
33    MetricPublisher,
34}
35
36/// Task trait defining the core behavior for cancelable background tasks
37///
38/// This trait is implemented by any struct that needs to run as a background task.
39/// The run_once method contains the main logic of the task, and returns a bool
40/// indicating whether it did work (true) or needs to wait (false).
41#[async_trait]
42pub trait Task: Send + Sync + 'static {
43    /// The output of the fn run_once
44    type Output: Send + Sync + 'static + Sized;
45    /// The variant of the task
46    const VARIANT: TaskVariant;
47    /// Run the task once, returning whether work was done
48    ///
49    /// Returns:
50    /// - `Ok(true)` if the task did some work and is ready to run again immediately
51    /// - `Ok(false)` if the task did not do work and should wait before running again
52    /// - `Err(...)` if the task encountered an error
53    async fn run_once(&mut self) -> Result<Self::Output, BridgeError>;
54}
55
56/// A trait for objects that can be converted into a Task
57pub trait IntoTask {
58    type Task: Task;
59
60    /// Convert self into a Task
61    fn into_task(self) -> Self::Task;
62}
63
64impl<T: Task> IntoTask for T {
65    type Task = T;
66
67    fn into_task(self) -> Self::Task {
68        self
69    }
70}
71
72/// A task that adds a certain delay after the inner task has run
73/// to reduce polling frequency. When inner returns false, the delay is applied.
74#[derive(Debug)]
75pub struct WithDelay<T: Task>
76where
77    T::Output: Into<bool>,
78{
79    /// The task to poll
80    inner: T,
81    /// The interval between polls when no work is done
82    poll_delay: Duration,
83}
84
85impl<T: Task> WithDelay<T>
86where
87    T::Output: Into<bool>,
88{
89    /// Create a new delayed task
90    pub fn new(inner: T, poll_delay: Duration) -> Self {
91        Self { inner, poll_delay }
92    }
93}
94
95#[async_trait]
96impl<T: Task> Task for WithDelay<T>
97where
98    T::Output: Into<bool>,
99{
100    type Output = bool;
101    const VARIANT: TaskVariant = T::VARIANT;
102    async fn run_once(&mut self) -> Result<bool, BridgeError> {
103        // Run the inner task
104        let did_work = self.inner.run_once().await?.into();
105
106        // If the inner task did not do work, sleep for the poll delay
107        if !did_work {
108            sleep(self.poll_delay).await;
109        }
110
111        // Always return false since we've handled the waiting internally
112        Ok(false)
113    }
114}
115
116/// A task that can be canceled via a oneshot channel
117#[derive(Debug)]
118pub struct CancelableTask<T: Task> {
119    /// The task to run
120    inner: T,
121    /// Receiver for cancellation signal
122    cancel_rx: oneshot::Receiver<()>,
123}
124
125impl<T: Task> CancelableTask<T> {
126    /// Create a new cancelable task with a cancellation channel
127    pub fn new(inner: T, cancel_rx: oneshot::Receiver<()>) -> Self {
128        Self { inner, cancel_rx }
129    }
130}
131
132#[derive(Debug, Clone)]
133pub enum CancelableResult<T> {
134    Running(T),
135    Cancelled,
136}
137
138#[async_trait]
139impl<T: Task> Task for CancelableTask<T> {
140    type Output = CancelableResult<T::Output>;
141    const VARIANT: TaskVariant = T::VARIANT;
142
143    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
144        // Check if we've been canceled
145        if let Err(TryRecvError::Empty) = self.cancel_rx.try_recv() {
146            // Run the inner task
147            Ok(CancelableResult::Running(self.inner.run_once().await?))
148        } else {
149            Ok(CancelableResult::Cancelled)
150        }
151    }
152}
153
154#[derive(Debug)]
155pub struct CancelableLoop<T: Task + Sized> {
156    inner: CancelableTask<T>,
157}
158
159#[async_trait]
160impl<T: Task + Sized> Task for CancelableLoop<T> {
161    type Output = ();
162    const VARIANT: TaskVariant = T::VARIANT;
163
164    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
165        loop {
166            match self.inner.run_once().await {
167                Ok(CancelableResult::Running(_)) => {
168                    tokio::task::yield_now().await;
169                    continue;
170                }
171                Ok(CancelableResult::Cancelled) => return Ok(()),
172                Err(e) => return Err(e),
173            }
174        }
175    }
176}
177
178/// A trait for tasks that can handle errors, required for BufferedErrors.
179/// Tasks that want to use `into_buffered_errors()` must implement this trait
180/// to define how they recover from errors.
181#[async_trait]
182pub trait RecoverableTask: Task + Send + Sync {
183    /// Recover from an error by attempting to handle it.
184    /// If the error is handled, the task will continue running if error overflow limit is not reached.
185    async fn recover_from_error(&mut self, error: &BridgeError) -> Result<(), BridgeError>;
186}
187
188#[derive(Debug)]
189pub struct BufferedErrors<T: RecoverableTask + Sized>
190where
191    T::Output: Default,
192{
193    inner: T,
194    buffer: Vec<BridgeError>,
195    error_overflow_limit: usize,
196    handle_error_attempts: usize,
197    wait_between_recover_attempts: Duration,
198}
199
200impl<T: RecoverableTask + Sized> BufferedErrors<T>
201where
202    T::Output: Default,
203{
204    pub fn new(
205        inner: T,
206        error_overflow_limit: usize,
207        handle_error_attempts: usize,
208        wait_between_recover_attempts: Duration,
209    ) -> Self {
210        Self {
211            inner,
212            buffer: Vec::new(),
213            error_overflow_limit,
214            handle_error_attempts,
215            wait_between_recover_attempts,
216        }
217    }
218}
219
220#[async_trait]
221impl<T: RecoverableTask + Task + Sized + std::fmt::Debug> Task for BufferedErrors<T>
222where
223    T: Send,
224    T::Output: Default,
225{
226    type Output = T::Output;
227    const VARIANT: TaskVariant = T::VARIANT;
228
229    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
230        let result = self.inner.run_once().await;
231
232        match result {
233            Ok(output) => {
234                self.buffer.clear(); // clear buffer on first success
235                Ok(output)
236            }
237            Err(e) => {
238                tracing::error!(
239                    "Task {:?} error, attempting to recover: {e:?}",
240                    Self::VARIANT
241                );
242                // handle the error
243                for attempt in 1..=self.handle_error_attempts {
244                    let result = self.inner.recover_from_error(&e).await;
245                    match result {
246                        Ok(()) => break,
247                        Err(e) => {
248                            tracing::error!(
249                                "Task {:?} error, failed to recover (attempt {attempt}): {e:?}",
250                                Self::VARIANT,
251                            );
252                            if attempt == self.handle_error_attempts {
253                                // this will only close the task thread
254                                return Err(eyre::eyre!(
255                                    "Failed to recover from task {:?} error after {attempt} attempts, aborting...",
256                                    Self::VARIANT
257                                ).into());
258                            }
259                            // wait for the configured duration (self.wait_between_recover_attempts) before trying again
260                            tokio::time::sleep(self.wait_between_recover_attempts).await;
261                        }
262                    }
263                }
264                self.buffer.push(e);
265                if self.buffer.len() >= self.error_overflow_limit {
266                    let mut base_error: eyre::Report =
267                        self.buffer.pop().expect("just inserted above").into();
268
269                    for error in std::mem::take(&mut self.buffer) {
270                        base_error = base_error.wrap_err(error);
271                    }
272
273                    base_error = base_error.wrap_err(format!(
274                        "Exiting due to {} consecutive errors, the following chain is the list of errors.",
275                        self.error_overflow_limit
276                    ));
277
278                    Err(base_error.into())
279                } else {
280                    Ok(Default::default())
281                }
282            }
283        }
284    }
285}
286
287/// A task that ignores errors from the inner task and returns a default value.
288#[derive(Debug)]
289pub struct IgnoreError<T: Task + Sized>
290where
291    T::Output: Default,
292{
293    inner: T,
294}
295
296#[async_trait]
297impl<T: Task + Sized + std::fmt::Debug> Task for IgnoreError<T>
298where
299    T::Output: Default,
300{
301    type Output = T::Output;
302    const VARIANT: TaskVariant = T::VARIANT;
303
304    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
305        Ok(self
306            .inner
307            .run_once()
308            .await
309            .inspect_err(|e| {
310                tracing::error!(task=?self.inner, "Task error, suppressing due to errors ignored: {e:?}");
311            })
312            .ok()
313            .unwrap_or_default())
314    }
315}
316
317pub trait TaskExt: Task + Sized {
318    /// Skips running the task after cancellation using the sender.
319    fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>);
320
321    /// Runs the task in an infinite loop until cancelled using the sender.
322    fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>);
323
324    /// Adds the given delay after a run of the task when the task returns false.
325    fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
326    where
327        Self::Output: Into<bool>;
328
329    /// Spawns a [`tokio::task`] that runs the task once in the background.
330    fn into_bg(self) -> JoinHandle<Result<Self::Output, BridgeError>>;
331
332    /// Buffers consecutive errors until the task succeeds, emits all errors when there are
333    /// more than `error_overflow_limit` consecutive errors.
334    /// If the task fails, error will be tried to be handled up to `handle_error_attempts` times.
335    /// After each attempt, the task will wait for `wait_between_recover_attempts` before trying again.
336    fn into_buffered_errors(
337        self,
338        error_overflow_limit: usize,
339        handle_error_attempts: usize,
340        wait_between_recover_attempts: Duration,
341    ) -> BufferedErrors<Self>
342    where
343        Self: RecoverableTask,
344        Self::Output: Default;
345
346    /// Ignores errors from the task.
347    fn ignore_error(self) -> IgnoreError<Self>
348    where
349        Self::Output: Default;
350}
351
352impl<T: Task + Sized> TaskExt for T {
353    fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>) {
354        let (cancel_tx, cancel_rx) = oneshot::channel();
355        (CancelableTask::new(self, cancel_rx), cancel_tx)
356    }
357
358    fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>) {
359        let (task, cancel_tx) = self.cancelable();
360        (CancelableLoop { inner: task }, cancel_tx)
361    }
362
363    fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
364    where
365        Self::Output: Into<bool>,
366    {
367        WithDelay::new(self, poll_delay)
368    }
369
370    fn into_bg(mut self) -> JoinHandle<Result<Self::Output, BridgeError>> {
371        tokio::spawn(async move {
372            tracing::debug!(
373                "Running task {:?} with ID {:?}",
374                Self::VARIANT,
375                task::try_id()
376            );
377            self.run_once().await
378        })
379    }
380
381    fn into_buffered_errors(
382        self,
383        error_overflow_limit: usize,
384        handle_error_attempts: usize,
385        wait_between_recover_attempts: Duration,
386    ) -> BufferedErrors<Self>
387    where
388        Self: RecoverableTask,
389        Self::Output: Default,
390    {
391        BufferedErrors::new(
392            self,
393            error_overflow_limit,
394            handle_error_attempts,
395            wait_between_recover_attempts,
396        )
397    }
398
399    fn ignore_error(self) -> IgnoreError<Self>
400    where
401        Self::Output: Default,
402    {
403        IgnoreError { inner: self }
404    }
405}
406
407#[cfg(test)]
408mod tests;