clementine_core/task/
mod.rs

1use std::time::Duration;
2use tokio::sync::oneshot;
3use tokio::sync::oneshot::error::TryRecvError;
4use tokio::task::{self, JoinHandle};
5use tokio::time::sleep;
6use tonic::async_trait;
7
8use clementine_errors::BridgeError;
9
10pub mod aggregator_metric_publisher;
11pub mod entity_metric_publisher;
12pub mod lcp_syncer;
13pub mod manager;
14pub mod payout_checker;
15pub mod status_monitor;
16#[cfg(feature = "automation")]
17pub mod tx_sender;
18
19/// The variant of the task, used for identifying the task in the status monitor
20/// Create a new enum variant for each task that you want to track in the status monitor
21/// BackgroundTaskManager will use TaskVariant to identify the tasks, to not start the same task twice.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum TaskVariant {
24    PayoutChecker,
25    StateManager,
26    FinalizedBlockFetcher,
27    LcpSyncer,
28    TxSender,
29    BitcoinSyncer,
30    TaskStatusMonitor,
31    #[cfg(test)]
32    Counter,
33    #[cfg(test)]
34    Sleep,
35    /// Used to publish metrics to Prometheus periodically. This
36    MetricPublisher,
37}
38
39/// Task trait defining the core behavior for cancelable background tasks
40///
41/// This trait is implemented by any struct that needs to run as a background task.
42/// The run_once method contains the main logic of the task, and returns a bool
43/// indicating whether it did work (true) or needs to wait (false).
44#[async_trait]
45pub trait Task: Send + Sync + 'static {
46    /// The output of the fn run_once
47    type Output: Send + Sync + 'static + Sized;
48    /// The variant of the task
49    const VARIANT: TaskVariant;
50    /// Run the task once, returning whether work was done
51    ///
52    /// Returns:
53    /// - `Ok(true)` if the task did some work and is ready to run again immediately
54    /// - `Ok(false)` if the task did not do work and should wait before running again
55    /// - `Err(...)` if the task encountered an error
56    async fn run_once(&mut self) -> Result<Self::Output, BridgeError>;
57}
58
59/// A trait for objects that can be converted into a Task
60pub trait IntoTask {
61    type Task: Task;
62
63    /// Convert self into a Task
64    fn into_task(self) -> Self::Task;
65}
66
67impl<T: Task> IntoTask for T {
68    type Task = T;
69
70    fn into_task(self) -> Self::Task {
71        self
72    }
73}
74
75/// A task that adds a certain delay after the inner task has run
76/// to reduce polling frequency. When inner returns false, the delay is applied.
77#[derive(Debug)]
78pub struct WithDelay<T: Task>
79where
80    T::Output: Into<bool>,
81{
82    /// The task to poll
83    inner: T,
84    /// The interval between polls when no work is done
85    poll_delay: Duration,
86}
87
88impl<T: Task> WithDelay<T>
89where
90    T::Output: Into<bool>,
91{
92    /// Create a new delayed task
93    pub fn new(inner: T, poll_delay: Duration) -> Self {
94        Self { inner, poll_delay }
95    }
96}
97
98#[async_trait]
99impl<T: Task> Task for WithDelay<T>
100where
101    T::Output: Into<bool>,
102{
103    type Output = bool;
104    const VARIANT: TaskVariant = T::VARIANT;
105    async fn run_once(&mut self) -> Result<bool, BridgeError> {
106        // Run the inner task
107        let did_work = self.inner.run_once().await?.into();
108
109        // If the inner task did not do work, sleep for the poll delay
110        if !did_work {
111            sleep(self.poll_delay).await;
112        }
113
114        // Always return false since we've handled the waiting internally
115        Ok(false)
116    }
117}
118
119/// A task that can be canceled via a oneshot channel
120#[derive(Debug)]
121pub struct CancelableTask<T: Task> {
122    /// The task to run
123    inner: T,
124    /// Receiver for cancellation signal
125    cancel_rx: oneshot::Receiver<()>,
126}
127
128impl<T: Task> CancelableTask<T> {
129    /// Create a new cancelable task with a cancellation channel
130    pub fn new(inner: T, cancel_rx: oneshot::Receiver<()>) -> Self {
131        Self { inner, cancel_rx }
132    }
133}
134
135#[derive(Debug, Clone)]
136pub enum CancelableResult<T> {
137    Running(T),
138    Cancelled,
139}
140
141#[async_trait]
142impl<T: Task> Task for CancelableTask<T> {
143    type Output = CancelableResult<T::Output>;
144    const VARIANT: TaskVariant = T::VARIANT;
145
146    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
147        // Check if we've been canceled
148        if let Err(TryRecvError::Empty) = self.cancel_rx.try_recv() {
149            // Run the inner task
150            Ok(CancelableResult::Running(self.inner.run_once().await?))
151        } else {
152            Ok(CancelableResult::Cancelled)
153        }
154    }
155}
156
157#[derive(Debug)]
158pub struct CancelableLoop<T: Task + Sized> {
159    inner: CancelableTask<T>,
160}
161
162#[async_trait]
163impl<T: Task + Sized> Task for CancelableLoop<T> {
164    type Output = ();
165    const VARIANT: TaskVariant = T::VARIANT;
166
167    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
168        loop {
169            match self.inner.run_once().await {
170                Ok(CancelableResult::Running(_)) => {
171                    tokio::task::yield_now().await;
172                    continue;
173                }
174                Ok(CancelableResult::Cancelled) => return Ok(()),
175                Err(e) => return Err(e),
176            }
177        }
178    }
179}
180
181/// A trait for tasks that can handle errors, required for BufferedErrors.
182/// Tasks that want to use `into_buffered_errors()` must implement this trait
183/// to define how they recover from errors.
184#[async_trait]
185pub trait RecoverableTask: Task + Send + Sync {
186    /// Recover from an error by attempting to handle it.
187    /// If the error is handled, the task will continue running if error overflow limit is not reached.
188    async fn recover_from_error(&mut self, error: &BridgeError) -> Result<(), BridgeError>;
189}
190
191#[derive(Debug)]
192pub struct BufferedErrors<T: RecoverableTask + Sized>
193where
194    T::Output: Default,
195{
196    inner: T,
197    buffer: Vec<BridgeError>,
198    error_overflow_limit: usize,
199    handle_error_attempts: usize,
200    wait_between_recover_attempts: Duration,
201}
202
203impl<T: RecoverableTask + Sized> BufferedErrors<T>
204where
205    T::Output: Default,
206{
207    pub fn new(
208        inner: T,
209        error_overflow_limit: usize,
210        handle_error_attempts: usize,
211        wait_between_recover_attempts: Duration,
212    ) -> Self {
213        Self {
214            inner,
215            buffer: Vec::new(),
216            error_overflow_limit,
217            handle_error_attempts,
218            wait_between_recover_attempts,
219        }
220    }
221}
222
223#[async_trait]
224impl<T: RecoverableTask + Task + Sized + std::fmt::Debug> Task for BufferedErrors<T>
225where
226    T: Send,
227    T::Output: Default,
228{
229    type Output = T::Output;
230    const VARIANT: TaskVariant = T::VARIANT;
231
232    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
233        let result = self.inner.run_once().await;
234
235        match result {
236            Ok(output) => {
237                self.buffer.clear(); // clear buffer on first success
238                Ok(output)
239            }
240            Err(e) => {
241                tracing::error!(
242                    "Task {:?} error, attempting to recover: {e:?}",
243                    Self::VARIANT
244                );
245                // handle the error
246                for attempt in 1..=self.handle_error_attempts {
247                    let result = self.inner.recover_from_error(&e).await;
248                    match result {
249                        Ok(()) => break,
250                        Err(e) => {
251                            tracing::error!(
252                                "Task {:?} error, failed to recover (attempt {attempt}): {e:?}",
253                                Self::VARIANT,
254                            );
255                            if attempt == self.handle_error_attempts {
256                                // this will only close the task thread
257                                return Err(eyre::eyre!(
258                                    "Failed to recover from task {:?} error after {attempt} attempts, aborting...",
259                                    Self::VARIANT
260                                ).into());
261                            }
262                            // wait for the configured duration (self.wait_between_recover_attempts) before trying again
263                            tokio::time::sleep(self.wait_between_recover_attempts).await;
264                        }
265                    }
266                }
267                self.buffer.push(e);
268                if self.buffer.len() >= self.error_overflow_limit {
269                    let mut base_error: eyre::Report =
270                        self.buffer.pop().expect("just inserted above").into();
271
272                    for error in std::mem::take(&mut self.buffer) {
273                        base_error = base_error.wrap_err(error);
274                    }
275
276                    base_error = base_error.wrap_err(format!(
277                        "Exiting due to {} consecutive errors, the following chain is the list of errors.",
278                        self.error_overflow_limit
279                    ));
280
281                    Err(base_error.into())
282                } else {
283                    Ok(Default::default())
284                }
285            }
286        }
287    }
288}
289
290/// A task that ignores errors from the inner task and returns a default value.
291#[derive(Debug)]
292pub struct IgnoreError<T: Task + Sized>
293where
294    T::Output: Default,
295{
296    inner: T,
297}
298
299#[async_trait]
300impl<T: Task + Sized + std::fmt::Debug> Task for IgnoreError<T>
301where
302    T::Output: Default,
303{
304    type Output = T::Output;
305    const VARIANT: TaskVariant = T::VARIANT;
306
307    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
308        Ok(self
309            .inner
310            .run_once()
311            .await
312            .inspect_err(|e| {
313                tracing::error!(task=?self.inner, "Task error, suppressing due to errors ignored: {e:?}");
314            })
315            .ok()
316            .unwrap_or_default())
317    }
318}
319
320pub trait TaskExt: Task + Sized {
321    /// Skips running the task after cancellation using the sender.
322    fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>);
323
324    /// Runs the task in an infinite loop until cancelled using the sender.
325    fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>);
326
327    /// Adds the given delay after a run of the task when the task returns false.
328    fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
329    where
330        Self::Output: Into<bool>;
331
332    /// Spawns a [`tokio::task`] that runs the task once in the background.
333    fn into_bg(self) -> JoinHandle<Result<Self::Output, BridgeError>>;
334
335    /// Buffers consecutive errors until the task succeeds, emits all errors when there are
336    /// more than `error_overflow_limit` consecutive errors.
337    /// If the task fails, error will be tried to be handled up to `handle_error_attempts` times.
338    /// After each attempt, the task will wait for `wait_between_recover_attempts` before trying again.
339    fn into_buffered_errors(
340        self,
341        error_overflow_limit: usize,
342        handle_error_attempts: usize,
343        wait_between_recover_attempts: Duration,
344    ) -> BufferedErrors<Self>
345    where
346        Self: RecoverableTask,
347        Self::Output: Default;
348
349    /// Ignores errors from the task.
350    fn ignore_error(self) -> IgnoreError<Self>
351    where
352        Self::Output: Default;
353}
354
355impl<T: Task + Sized> TaskExt for T {
356    fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>) {
357        let (cancel_tx, cancel_rx) = oneshot::channel();
358        (CancelableTask::new(self, cancel_rx), cancel_tx)
359    }
360
361    fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>) {
362        let (task, cancel_tx) = self.cancelable();
363        (CancelableLoop { inner: task }, cancel_tx)
364    }
365
366    fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
367    where
368        Self::Output: Into<bool>,
369    {
370        WithDelay::new(self, poll_delay)
371    }
372
373    fn into_bg(mut self) -> JoinHandle<Result<Self::Output, BridgeError>> {
374        tokio::spawn(async move {
375            tracing::debug!(
376                "Running task {:?} with ID {:?}",
377                Self::VARIANT,
378                task::try_id()
379            );
380            self.run_once().await
381        })
382    }
383
384    fn into_buffered_errors(
385        self,
386        error_overflow_limit: usize,
387        handle_error_attempts: usize,
388        wait_between_recover_attempts: Duration,
389    ) -> BufferedErrors<Self>
390    where
391        Self: RecoverableTask,
392        Self::Output: Default,
393    {
394        BufferedErrors::new(
395            self,
396            error_overflow_limit,
397            handle_error_attempts,
398            wait_between_recover_attempts,
399        )
400    }
401
402    fn ignore_error(self) -> IgnoreError<Self>
403    where
404        Self::Output: Default,
405    {
406        IgnoreError { inner: self }
407    }
408}
409
410#[cfg(test)]
411mod tests;