clementine_core/tx_sender/
task.rs

1//! # Transaction Sender Task
2//!
3//! This module provides the [`Task`] implementation for the [`TxSender`].
4//!
5//! This task will fetch block events from [`Bitcoin Syncer`](crate::bitcoin_syncer)
6//! and confirms or unconfirms transaction based on the event. Finally, it will
7//! try to send transactions that are in the queue. Transactions are picked from
8//! the database and sent to the Bitcoin network if a transaction is in queue
9//! and not in the [`Bitcoin Syncer`](crate::bitcoin_syncer) database.
10
11use super::TxSender;
12use crate::errors::ResultExt;
13use crate::task::{IgnoreError, TaskVariant, WithDelay};
14use crate::{
15    bitcoin_syncer::BitcoinSyncerEvent,
16    database::Database,
17    errors::BridgeError,
18    task::{IntoTask, Task, TaskExt},
19};
20use std::time::Duration;
21use tonic::async_trait;
22
23const POLL_DELAY: Duration = if cfg!(test) {
24    Duration::from_millis(250)
25} else {
26    Duration::from_secs(30)
27};
28
29#[derive(Debug)]
30pub struct TxSenderTask {
31    db: Database,
32    current_tip_height: u32,
33    last_processed_tip_height: u32,
34    inner: TxSender,
35}
36
37#[async_trait]
38impl Task for TxSenderTask {
39    type Output = bool;
40    const VARIANT: TaskVariant = TaskVariant::TxSender;
41
42    #[tracing::instrument(skip(self), name = "tx_sender_task")]
43    async fn run_once(&mut self) -> std::result::Result<Self::Output, BridgeError> {
44        let mut dbtx = self.db.begin_transaction().await.map_to_eyre()?;
45
46        let is_block_update = async {
47            let Some(event) = self
48                .db
49                .fetch_next_bitcoin_syncer_evt(&mut dbtx, &self.inner.btc_syncer_consumer_id)
50                .await?
51            else {
52                return Ok(false);
53            };
54            tracing::info!("Received Bitcoin syncer event: {:?}", event);
55
56            tracing::debug!("TXSENDER: Event: {:?}", event);
57            Ok::<_, BridgeError>(match event {
58                BitcoinSyncerEvent::NewBlock(block_id) => {
59                    let block_height = self
60                        .db
61                        .get_block_info_from_id(Some(&mut dbtx), block_id)
62                        .await?
63                        .ok_or(eyre::eyre!("Block not found in TxSenderTask"))?
64                        .1;
65                    tracing::info!(
66                        height = self.current_tip_height,
67                        block_id = %block_id,
68                        "Block mined, confirming transactions..."
69                    );
70
71                    self.db.confirm_transactions(&mut dbtx, block_id).await?;
72
73                    dbtx.commit().await?;
74                    // update after db commit
75                    self.current_tip_height = block_height;
76                    true
77                }
78                BitcoinSyncerEvent::ReorgedBlock(block_id) => {
79                    let height = self
80                        .db
81                        .get_block_info_from_id(Some(&mut dbtx), block_id)
82                        .await?
83                        .ok_or(eyre::eyre!("Block not found in TxSenderTask"))?
84                        .1;
85                    tracing::info!(
86                        height = height,
87                        block_id = %block_id,
88                        "Reorged happened, unconfirming transactions..."
89                    );
90
91                    self.db.unconfirm_transactions(&mut dbtx, block_id).await?;
92
93                    dbtx.commit().await?;
94                    true
95                }
96            })
97        }
98        .await?;
99
100        // If there is a block update, it is possible that there are more.
101        // Before sending, fetch all events and process them without waiting.
102        if is_block_update {
103            return Ok(true);
104        }
105
106        tracing::debug!("TXSENDER: Getting fee rate");
107        let fee_rate_result = self.inner.get_fee_rate().await;
108        tracing::debug!("TXSENDER: Fee rate result: {:?}", fee_rate_result);
109        let fee_rate = fee_rate_result?;
110
111        self.inner
112            .try_to_send_unconfirmed_txs(
113                fee_rate,
114                self.current_tip_height,
115                self.last_processed_tip_height != self.current_tip_height,
116            )
117            .await?;
118        self.last_processed_tip_height = self.current_tip_height;
119
120        Ok(false)
121    }
122}
123
124impl IntoTask for TxSender {
125    type Task = WithDelay<IgnoreError<TxSenderTask>>;
126
127    fn into_task(self) -> Self::Task {
128        TxSenderTask {
129            db: self.db.clone(),
130            current_tip_height: 0,
131            last_processed_tip_height: 0,
132            inner: self,
133        }
134        .ignore_error()
135        .with_delay(POLL_DELAY)
136    }
137}