1use std::time::Duration;
2use tokio::sync::oneshot;
3use tokio::sync::oneshot::error::TryRecvError;
4use tokio::task::{self, JoinHandle};
5use tokio::time::sleep;
6use tonic::async_trait;
7
8use crate::errors::BridgeError;
9
10pub mod aggregator_metric_publisher;
11pub mod entity_metric_publisher;
12pub mod manager;
13pub mod payout_checker;
14pub mod status_monitor;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub enum TaskVariant {
21 PayoutChecker,
22 StateManager,
23 FinalizedBlockFetcher,
24 TxSender,
25 BitcoinSyncer,
26 TaskStatusMonitor,
27 #[cfg(test)]
28 Counter,
29 #[cfg(test)]
30 Sleep,
31 MetricPublisher,
33}
34
35#[async_trait]
41pub trait Task: Send + Sync + 'static {
42 type Output: Send + Sync + 'static + Sized;
44 const VARIANT: TaskVariant;
46 async fn run_once(&mut self) -> Result<Self::Output, BridgeError>;
53}
54
55pub trait IntoTask {
57 type Task: Task;
58
59 fn into_task(self) -> Self::Task;
61}
62
63impl<T: Task> IntoTask for T {
64 type Task = T;
65
66 fn into_task(self) -> Self::Task {
67 self
68 }
69}
70
71#[derive(Debug)]
74pub struct WithDelay<T: Task>
75where
76 T::Output: Into<bool>,
77{
78 inner: T,
80 poll_delay: Duration,
82}
83
84impl<T: Task> WithDelay<T>
85where
86 T::Output: Into<bool>,
87{
88 pub fn new(inner: T, poll_delay: Duration) -> Self {
90 Self { inner, poll_delay }
91 }
92}
93
94#[async_trait]
95impl<T: Task> Task for WithDelay<T>
96where
97 T::Output: Into<bool>,
98{
99 type Output = bool;
100 const VARIANT: TaskVariant = T::VARIANT;
101 async fn run_once(&mut self) -> Result<bool, BridgeError> {
102 let did_work = self.inner.run_once().await?.into();
104
105 if !did_work {
107 sleep(self.poll_delay).await;
108 }
109
110 Ok(false)
112 }
113}
114
115#[derive(Debug)]
117pub struct CancelableTask<T: Task> {
118 inner: T,
120 cancel_rx: oneshot::Receiver<()>,
122}
123
124impl<T: Task> CancelableTask<T> {
125 pub fn new(inner: T, cancel_rx: oneshot::Receiver<()>) -> Self {
127 Self { inner, cancel_rx }
128 }
129}
130
131#[derive(Debug, Clone)]
132pub enum CancelableResult<T> {
133 Running(T),
134 Cancelled,
135}
136
137#[async_trait]
138impl<T: Task> Task for CancelableTask<T> {
139 type Output = CancelableResult<T::Output>;
140 const VARIANT: TaskVariant = T::VARIANT;
141
142 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
143 if let Err(TryRecvError::Empty) = self.cancel_rx.try_recv() {
145 Ok(CancelableResult::Running(self.inner.run_once().await?))
147 } else {
148 Ok(CancelableResult::Cancelled)
149 }
150 }
151}
152
153#[derive(Debug)]
154pub struct CancelableLoop<T: Task + Sized> {
155 inner: CancelableTask<T>,
156}
157
158#[async_trait]
159impl<T: Task + Sized> Task for CancelableLoop<T> {
160 type Output = ();
161 const VARIANT: TaskVariant = T::VARIANT;
162
163 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
164 loop {
165 match self.inner.run_once().await {
166 Ok(CancelableResult::Running(_)) => {
167 tokio::task::yield_now().await;
168 continue;
169 }
170 Ok(CancelableResult::Cancelled) => return Ok(()),
171 Err(e) => return Err(e),
172 }
173 }
174 }
175}
176
177#[async_trait]
181pub trait RecoverableTask: Task + Send + Sync {
182 async fn recover_from_error(&mut self, error: &BridgeError) -> Result<(), BridgeError>;
185}
186
187#[derive(Debug)]
188pub struct BufferedErrors<T: RecoverableTask + Sized>
189where
190 T::Output: Default,
191{
192 inner: T,
193 buffer: Vec<BridgeError>,
194 error_overflow_limit: usize,
195 handle_error_attempts: usize,
196 wait_between_recover_attempts: Duration,
197}
198
199impl<T: RecoverableTask + Sized> BufferedErrors<T>
200where
201 T::Output: Default,
202{
203 pub fn new(
204 inner: T,
205 error_overflow_limit: usize,
206 handle_error_attempts: usize,
207 wait_between_recover_attempts: Duration,
208 ) -> Self {
209 Self {
210 inner,
211 buffer: Vec::new(),
212 error_overflow_limit,
213 handle_error_attempts,
214 wait_between_recover_attempts,
215 }
216 }
217}
218
219#[async_trait]
220impl<T: RecoverableTask + Task + Sized + std::fmt::Debug> Task for BufferedErrors<T>
221where
222 T: Send,
223 T::Output: Default,
224{
225 type Output = T::Output;
226 const VARIANT: TaskVariant = T::VARIANT;
227
228 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
229 let result = self.inner.run_once().await;
230
231 match result {
232 Ok(output) => {
233 self.buffer.clear(); Ok(output)
235 }
236 Err(e) => {
237 tracing::error!(
238 "Task {:?} error, attempting to recover: {e:?}",
239 Self::VARIANT
240 );
241 for attempt in 1..=self.handle_error_attempts {
243 let result = self.inner.recover_from_error(&e).await;
244 match result {
245 Ok(()) => break,
246 Err(e) => {
247 tracing::error!(
248 "Task {:?} error, failed to recover (attempt {attempt}): {e:?}",
249 Self::VARIANT,
250 );
251 if attempt == self.handle_error_attempts {
252 return Err(eyre::eyre!(
254 "Failed to recover from task {:?} error after {attempt} attempts, aborting...",
255 Self::VARIANT
256 ).into());
257 }
258 tokio::time::sleep(self.wait_between_recover_attempts).await;
260 }
261 }
262 }
263 self.buffer.push(e);
264 if self.buffer.len() >= self.error_overflow_limit {
265 let mut base_error: eyre::Report =
266 self.buffer.pop().expect("just inserted above").into();
267
268 for error in std::mem::take(&mut self.buffer) {
269 base_error = base_error.wrap_err(error);
270 }
271
272 base_error = base_error.wrap_err(format!(
273 "Exiting due to {} consecutive errors, the following chain is the list of errors.",
274 self.error_overflow_limit
275 ));
276
277 Err(base_error.into())
278 } else {
279 Ok(Default::default())
280 }
281 }
282 }
283 }
284}
285
286#[derive(Debug)]
287pub struct Map<T: Task + Sized, F: Fn(T::Output) -> T::Output + Send + Sync + 'static> {
288 inner: T,
289 map: F,
290}
291
292#[async_trait]
293impl<T: Task + Sized, F: Fn(T::Output) -> T::Output + Send + Sync + 'static> Task for Map<T, F> {
294 type Output = T::Output;
295 const VARIANT: TaskVariant = T::VARIANT;
296
297 #[track_caller]
298 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
299 let result = self.inner.run_once().await;
300 let output = match result {
301 Ok(output) => (self.map)(output),
302 Err(e) => return Err(e),
303 };
304 Ok(output)
305 }
306}
307
308#[derive(Debug)]
310pub struct IgnoreError<T: Task + Sized>
311where
312 T::Output: Default,
313{
314 inner: T,
315}
316
317#[async_trait]
318impl<T: Task + Sized + std::fmt::Debug> Task for IgnoreError<T>
319where
320 T::Output: Default,
321{
322 type Output = T::Output;
323 const VARIANT: TaskVariant = T::VARIANT;
324
325 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
326 Ok(self
327 .inner
328 .run_once()
329 .await
330 .inspect_err(|e| {
331 tracing::error!(task=?self.inner, "Task error, suppressing due to errors ignored: {e:?}");
332 })
333 .ok()
334 .unwrap_or_default())
335 }
336}
337
338pub trait TaskExt: Task + Sized {
339 fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>);
341
342 fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>);
344
345 fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
347 where
348 Self::Output: Into<bool>;
349
350 fn into_bg(self) -> JoinHandle<Result<Self::Output, BridgeError>>;
352
353 fn into_buffered_errors(
358 self,
359 error_overflow_limit: usize,
360 handle_error_attempts: usize,
361 wait_between_recover_attempts: Duration,
362 ) -> BufferedErrors<Self>
363 where
364 Self: RecoverableTask,
365 Self::Output: Default;
366
367 fn map<F: Fn(Self::Output) -> Self::Output + Send + Sync + 'static>(
369 self,
370 map: F,
371 ) -> Map<Self, F>;
372
373 fn ignore_error(self) -> IgnoreError<Self>
375 where
376 Self::Output: Default;
377}
378
379impl<T: Task + Sized> TaskExt for T {
380 fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>) {
381 let (cancel_tx, cancel_rx) = oneshot::channel();
382 (CancelableTask::new(self, cancel_rx), cancel_tx)
383 }
384
385 fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>) {
386 let (task, cancel_tx) = self.cancelable();
387 (CancelableLoop { inner: task }, cancel_tx)
388 }
389
390 fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
391 where
392 Self::Output: Into<bool>,
393 {
394 WithDelay::new(self, poll_delay)
395 }
396
397 fn into_bg(mut self) -> JoinHandle<Result<Self::Output, BridgeError>> {
398 tokio::spawn(async move {
399 tracing::debug!(
400 "Running task {:?} with ID {:?}",
401 Self::VARIANT,
402 task::try_id()
403 );
404 self.run_once().await
405 })
406 }
407
408 fn into_buffered_errors(
409 self,
410 error_overflow_limit: usize,
411 handle_error_attempts: usize,
412 wait_between_recover_attempts: Duration,
413 ) -> BufferedErrors<Self>
414 where
415 Self: RecoverableTask,
416 Self::Output: Default,
417 {
418 BufferedErrors::new(
419 self,
420 error_overflow_limit,
421 handle_error_attempts,
422 wait_between_recover_attempts,
423 )
424 }
425
426 fn map<F: Fn(Self::Output) -> Self::Output + Send + Sync + 'static>(
427 self,
428 map: F,
429 ) -> Map<Self, F> {
430 Map { inner: self, map }
431 }
432
433 fn ignore_error(self) -> IgnoreError<Self>
434 where
435 Self::Output: Default,
436 {
437 IgnoreError { inner: self }
438 }
439}
440
441#[cfg(test)]
442mod tests;