1use std::time::Duration;
2use tokio::sync::oneshot;
3use tokio::sync::oneshot::error::TryRecvError;
4use tokio::task::{self, JoinHandle};
5use tokio::time::sleep;
6use tonic::async_trait;
7
8use clementine_errors::BridgeError;
9
10pub mod aggregator_metric_publisher;
11pub mod entity_metric_publisher;
12pub mod manager;
13pub mod payout_checker;
14pub mod status_monitor;
15pub mod tx_sender;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub enum TaskVariant {
22 PayoutChecker,
23 StateManager,
24 FinalizedBlockFetcher,
25 TxSender,
26 BitcoinSyncer,
27 TaskStatusMonitor,
28 #[cfg(test)]
29 Counter,
30 #[cfg(test)]
31 Sleep,
32 MetricPublisher,
34}
35
36#[async_trait]
42pub trait Task: Send + Sync + 'static {
43 type Output: Send + Sync + 'static + Sized;
45 const VARIANT: TaskVariant;
47 async fn run_once(&mut self) -> Result<Self::Output, BridgeError>;
54}
55
56pub trait IntoTask {
58 type Task: Task;
59
60 fn into_task(self) -> Self::Task;
62}
63
64impl<T: Task> IntoTask for T {
65 type Task = T;
66
67 fn into_task(self) -> Self::Task {
68 self
69 }
70}
71
72#[derive(Debug)]
75pub struct WithDelay<T: Task>
76where
77 T::Output: Into<bool>,
78{
79 inner: T,
81 poll_delay: Duration,
83}
84
85impl<T: Task> WithDelay<T>
86where
87 T::Output: Into<bool>,
88{
89 pub fn new(inner: T, poll_delay: Duration) -> Self {
91 Self { inner, poll_delay }
92 }
93}
94
95#[async_trait]
96impl<T: Task> Task for WithDelay<T>
97where
98 T::Output: Into<bool>,
99{
100 type Output = bool;
101 const VARIANT: TaskVariant = T::VARIANT;
102 async fn run_once(&mut self) -> Result<bool, BridgeError> {
103 let did_work = self.inner.run_once().await?.into();
105
106 if !did_work {
108 sleep(self.poll_delay).await;
109 }
110
111 Ok(false)
113 }
114}
115
116#[derive(Debug)]
118pub struct CancelableTask<T: Task> {
119 inner: T,
121 cancel_rx: oneshot::Receiver<()>,
123}
124
125impl<T: Task> CancelableTask<T> {
126 pub fn new(inner: T, cancel_rx: oneshot::Receiver<()>) -> Self {
128 Self { inner, cancel_rx }
129 }
130}
131
132#[derive(Debug, Clone)]
133pub enum CancelableResult<T> {
134 Running(T),
135 Cancelled,
136}
137
138#[async_trait]
139impl<T: Task> Task for CancelableTask<T> {
140 type Output = CancelableResult<T::Output>;
141 const VARIANT: TaskVariant = T::VARIANT;
142
143 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
144 if let Err(TryRecvError::Empty) = self.cancel_rx.try_recv() {
146 Ok(CancelableResult::Running(self.inner.run_once().await?))
148 } else {
149 Ok(CancelableResult::Cancelled)
150 }
151 }
152}
153
154#[derive(Debug)]
155pub struct CancelableLoop<T: Task + Sized> {
156 inner: CancelableTask<T>,
157}
158
159#[async_trait]
160impl<T: Task + Sized> Task for CancelableLoop<T> {
161 type Output = ();
162 const VARIANT: TaskVariant = T::VARIANT;
163
164 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
165 loop {
166 match self.inner.run_once().await {
167 Ok(CancelableResult::Running(_)) => {
168 tokio::task::yield_now().await;
169 continue;
170 }
171 Ok(CancelableResult::Cancelled) => return Ok(()),
172 Err(e) => return Err(e),
173 }
174 }
175 }
176}
177
178#[async_trait]
182pub trait RecoverableTask: Task + Send + Sync {
183 async fn recover_from_error(&mut self, error: &BridgeError) -> Result<(), BridgeError>;
186}
187
188#[derive(Debug)]
189pub struct BufferedErrors<T: RecoverableTask + Sized>
190where
191 T::Output: Default,
192{
193 inner: T,
194 buffer: Vec<BridgeError>,
195 error_overflow_limit: usize,
196 handle_error_attempts: usize,
197 wait_between_recover_attempts: Duration,
198}
199
200impl<T: RecoverableTask + Sized> BufferedErrors<T>
201where
202 T::Output: Default,
203{
204 pub fn new(
205 inner: T,
206 error_overflow_limit: usize,
207 handle_error_attempts: usize,
208 wait_between_recover_attempts: Duration,
209 ) -> Self {
210 Self {
211 inner,
212 buffer: Vec::new(),
213 error_overflow_limit,
214 handle_error_attempts,
215 wait_between_recover_attempts,
216 }
217 }
218}
219
220#[async_trait]
221impl<T: RecoverableTask + Task + Sized + std::fmt::Debug> Task for BufferedErrors<T>
222where
223 T: Send,
224 T::Output: Default,
225{
226 type Output = T::Output;
227 const VARIANT: TaskVariant = T::VARIANT;
228
229 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
230 let result = self.inner.run_once().await;
231
232 match result {
233 Ok(output) => {
234 self.buffer.clear(); Ok(output)
236 }
237 Err(e) => {
238 tracing::error!(
239 "Task {:?} error, attempting to recover: {e:?}",
240 Self::VARIANT
241 );
242 for attempt in 1..=self.handle_error_attempts {
244 let result = self.inner.recover_from_error(&e).await;
245 match result {
246 Ok(()) => break,
247 Err(e) => {
248 tracing::error!(
249 "Task {:?} error, failed to recover (attempt {attempt}): {e:?}",
250 Self::VARIANT,
251 );
252 if attempt == self.handle_error_attempts {
253 return Err(eyre::eyre!(
255 "Failed to recover from task {:?} error after {attempt} attempts, aborting...",
256 Self::VARIANT
257 ).into());
258 }
259 tokio::time::sleep(self.wait_between_recover_attempts).await;
261 }
262 }
263 }
264 self.buffer.push(e);
265 if self.buffer.len() >= self.error_overflow_limit {
266 let mut base_error: eyre::Report =
267 self.buffer.pop().expect("just inserted above").into();
268
269 for error in std::mem::take(&mut self.buffer) {
270 base_error = base_error.wrap_err(error);
271 }
272
273 base_error = base_error.wrap_err(format!(
274 "Exiting due to {} consecutive errors, the following chain is the list of errors.",
275 self.error_overflow_limit
276 ));
277
278 Err(base_error.into())
279 } else {
280 Ok(Default::default())
281 }
282 }
283 }
284 }
285}
286
287#[derive(Debug)]
289pub struct IgnoreError<T: Task + Sized>
290where
291 T::Output: Default,
292{
293 inner: T,
294}
295
296#[async_trait]
297impl<T: Task + Sized + std::fmt::Debug> Task for IgnoreError<T>
298where
299 T::Output: Default,
300{
301 type Output = T::Output;
302 const VARIANT: TaskVariant = T::VARIANT;
303
304 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
305 Ok(self
306 .inner
307 .run_once()
308 .await
309 .inspect_err(|e| {
310 tracing::error!(task=?self.inner, "Task error, suppressing due to errors ignored: {e:?}");
311 })
312 .ok()
313 .unwrap_or_default())
314 }
315}
316
317pub trait TaskExt: Task + Sized {
318 fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>);
320
321 fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>);
323
324 fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
326 where
327 Self::Output: Into<bool>;
328
329 fn into_bg(self) -> JoinHandle<Result<Self::Output, BridgeError>>;
331
332 fn into_buffered_errors(
337 self,
338 error_overflow_limit: usize,
339 handle_error_attempts: usize,
340 wait_between_recover_attempts: Duration,
341 ) -> BufferedErrors<Self>
342 where
343 Self: RecoverableTask,
344 Self::Output: Default;
345
346 fn ignore_error(self) -> IgnoreError<Self>
348 where
349 Self::Output: Default;
350}
351
352impl<T: Task + Sized> TaskExt for T {
353 fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>) {
354 let (cancel_tx, cancel_rx) = oneshot::channel();
355 (CancelableTask::new(self, cancel_rx), cancel_tx)
356 }
357
358 fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>) {
359 let (task, cancel_tx) = self.cancelable();
360 (CancelableLoop { inner: task }, cancel_tx)
361 }
362
363 fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
364 where
365 Self::Output: Into<bool>,
366 {
367 WithDelay::new(self, poll_delay)
368 }
369
370 fn into_bg(mut self) -> JoinHandle<Result<Self::Output, BridgeError>> {
371 tokio::spawn(async move {
372 tracing::debug!(
373 "Running task {:?} with ID {:?}",
374 Self::VARIANT,
375 task::try_id()
376 );
377 self.run_once().await
378 })
379 }
380
381 fn into_buffered_errors(
382 self,
383 error_overflow_limit: usize,
384 handle_error_attempts: usize,
385 wait_between_recover_attempts: Duration,
386 ) -> BufferedErrors<Self>
387 where
388 Self: RecoverableTask,
389 Self::Output: Default,
390 {
391 BufferedErrors::new(
392 self,
393 error_overflow_limit,
394 handle_error_attempts,
395 wait_between_recover_attempts,
396 )
397 }
398
399 fn ignore_error(self) -> IgnoreError<Self>
400 where
401 Self::Output: Default,
402 {
403 IgnoreError { inner: self }
404 }
405}
406
407#[cfg(test)]
408mod tests;