clementine_core/task/
mod.rsuse std::time::Duration;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use tonic::async_trait;
use crate::errors::BridgeError;
pub mod manager;
pub mod payout_checker;
#[async_trait]
pub trait Task: Send + Sync + 'static {
type Output: Send + Sync + 'static + Sized;
async fn run_once(&mut self) -> Result<Self::Output, BridgeError>;
}
pub trait IntoTask {
type Task: Task;
fn into_task(self) -> Self::Task;
}
impl<T: Task> IntoTask for T {
type Task = T;
fn into_task(self) -> Self::Task {
self
}
}
#[derive(Debug)]
pub struct WithDelay<T: Task>
where
T::Output: Into<bool>,
{
inner: T,
poll_delay: Duration,
}
impl<T: Task> WithDelay<T>
where
T::Output: Into<bool>,
{
pub fn new(inner: T, poll_delay: Duration) -> Self {
Self { inner, poll_delay }
}
}
#[async_trait]
impl<T: Task> Task for WithDelay<T>
where
T::Output: Into<bool>,
{
type Output = bool;
async fn run_once(&mut self) -> Result<bool, BridgeError> {
let did_work = self.inner.run_once().await?.into();
if !did_work {
sleep(self.poll_delay).await;
}
Ok(false)
}
}
#[derive(Debug)]
pub struct CancelableTask<T: Task> {
inner: T,
cancel_rx: oneshot::Receiver<()>,
}
impl<T: Task> CancelableTask<T> {
pub fn new(inner: T, cancel_rx: oneshot::Receiver<()>) -> Self {
Self { inner, cancel_rx }
}
}
#[derive(Debug, Clone)]
pub enum CancelableResult<T> {
Running(T),
Cancelled,
}
#[async_trait]
impl<T: Task> Task for CancelableTask<T> {
type Output = CancelableResult<T::Output>;
async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
if let Err(TryRecvError::Empty) = self.cancel_rx.try_recv() {
Ok(CancelableResult::Running(self.inner.run_once().await?))
} else {
Ok(CancelableResult::Cancelled)
}
}
}
#[derive(Debug)]
pub struct CancelableLoop<T: Task + Sized> {
inner: CancelableTask<T>,
}
#[async_trait]
impl<T: Task + Sized> Task for CancelableLoop<T> {
type Output = ();
async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
loop {
match self.inner.run_once().await {
Ok(CancelableResult::Running(_)) => {
tokio::task::yield_now().await;
continue;
}
Ok(CancelableResult::Cancelled) => return Ok(()),
Err(e) => return Err(e),
}
}
}
}
#[derive(Debug)]
pub struct BufferedErrors<T: Task + Sized>
where
T::Output: Default,
{
inner: T,
buffer: Vec<BridgeError>,
error_overflow_limit: usize,
}
impl<T: Task + Sized> BufferedErrors<T>
where
T::Output: Default,
{
pub fn new(inner: T, error_overflow_limit: usize) -> Self {
Self {
inner,
buffer: Vec::new(),
error_overflow_limit,
}
}
}
#[async_trait]
impl<T: Task + Sized + std::fmt::Debug> Task for BufferedErrors<T>
where
T::Output: Default,
{
type Output = T::Output;
async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
let result = self.inner.run_once().await;
match result {
Ok(output) => {
self.buffer.clear(); Ok(output)
}
Err(e) => {
tracing::error!("Task error, suppressing due to buffer: {e:?}");
self.buffer.push(e);
if self.buffer.len() >= self.error_overflow_limit {
let mut base_error: eyre::Report =
self.buffer.pop().expect("just inserted above").into();
for error in std::mem::take(&mut self.buffer) {
base_error = base_error.wrap_err(error);
}
base_error = base_error.wrap_err(format!(
"Exiting due to {} consecutive errors, the following chain is the list of errors.",
self.error_overflow_limit
));
Err(base_error.into())
} else {
Ok(Default::default())
}
}
}
}
}
#[derive(Debug)]
pub struct Map<T: Task + Sized, F: Fn(T::Output) -> T::Output + Send + Sync + 'static> {
inner: T,
map: F,
}
#[async_trait]
impl<T: Task + Sized, F: Fn(T::Output) -> T::Output + Send + Sync + 'static> Task for Map<T, F> {
type Output = T::Output;
#[track_caller]
async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
let result = self.inner.run_once().await;
let output = match result {
Ok(output) => (self.map)(output),
Err(e) => return Err(e),
};
Ok(output)
}
}
#[derive(Debug)]
pub struct IgnoreError<T: Task + Sized>
where
T::Output: Default,
{
inner: T,
}
#[async_trait]
impl<T: Task + Sized + std::fmt::Debug> Task for IgnoreError<T>
where
T::Output: Default,
{
type Output = T::Output;
async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
Ok(self
.inner
.run_once()
.await
.inspect_err(|e| {
tracing::error!(task=?self.inner, "Task error, suppressing due to errors ignored: {e:?}");
})
.ok()
.unwrap_or_default())
}
}
pub trait TaskExt: Task + Sized {
fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>);
fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>);
fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
where
Self::Output: Into<bool>;
fn into_bg(self) -> JoinHandle<Result<Self::Output, BridgeError>>;
fn into_buffered_errors(self, error_overflow_limit: usize) -> BufferedErrors<Self>
where
Self::Output: Default;
fn map<F: Fn(Self::Output) -> Self::Output + Send + Sync + 'static>(
self,
map: F,
) -> Map<Self, F>;
fn ignore_error(self) -> IgnoreError<Self>
where
Self::Output: Default;
}
impl<T: Task + Sized> TaskExt for T {
fn cancelable(self) -> (CancelableTask<Self>, oneshot::Sender<()>) {
let (cancel_tx, cancel_rx) = oneshot::channel();
(CancelableTask::new(self, cancel_rx), cancel_tx)
}
fn cancelable_loop(self) -> (CancelableLoop<Self>, oneshot::Sender<()>) {
let (task, cancel_tx) = self.cancelable();
(CancelableLoop { inner: task }, cancel_tx)
}
fn with_delay(self, poll_delay: Duration) -> WithDelay<Self>
where
Self::Output: Into<bool>,
{
WithDelay::new(self, poll_delay)
}
fn into_bg(mut self) -> JoinHandle<Result<Self::Output, BridgeError>> {
tokio::spawn(async move { self.run_once().await })
}
fn into_buffered_errors(self, error_overflow_limit: usize) -> BufferedErrors<Self>
where
Self::Output: Default,
{
BufferedErrors::new(self, error_overflow_limit)
}
fn map<F: Fn(Self::Output) -> Self::Output + Send + Sync + 'static>(
self,
map: F,
) -> Map<Self, F> {
Map { inner: self, map }
}
fn ignore_error(self) -> IgnoreError<Self>
where
Self::Output: Default,
{
IgnoreError { inner: self }
}
}
#[cfg(test)]
mod tests;