clementine_tx_sender/
task.rs

1use crate::{TxSender, TxSenderDatabase, TxSenderSigner, TxSenderTxBuilder};
2use clementine_errors::BridgeError;
3use clementine_primitives::BitcoinSyncerEvent;
4use std::time::Duration;
5
6pub const POLL_DELAY: Duration = if cfg!(test) {
7    Duration::from_millis(250)
8} else {
9    Duration::from_secs(30)
10};
11
12#[derive(Debug)]
13pub struct TxSenderTaskInternal<S, D, B>
14where
15    S: TxSenderSigner + 'static,
16    D: TxSenderDatabase + Clone + 'static,
17    B: TxSenderTxBuilder + 'static,
18{
19    pub current_tip_height: u32,
20    pub last_processed_tip_height: u32,
21    pub inner: TxSender<S, D, B>,
22}
23
24impl<S, D, B> TxSenderTaskInternal<S, D, B>
25where
26    S: TxSenderSigner + 'static,
27    D: TxSenderDatabase + Clone + 'static,
28    B: TxSenderTxBuilder + 'static,
29{
30    pub fn new(inner: TxSender<S, D, B>) -> Self {
31        Self {
32            current_tip_height: 0,
33            last_processed_tip_height: 0,
34            inner,
35        }
36    }
37
38    #[tracing::instrument(skip(self), name = "tx_sender_task")]
39    pub async fn run_once(&mut self) -> Result<bool, BridgeError> {
40        let mut dbtx = self.inner.db.begin_transaction().await?;
41
42        let is_block_update = match self
43            .inner
44            .db
45            .fetch_next_bitcoin_syncer_evt(&mut dbtx, &self.inner.btc_syncer_consumer_id)
46            .await?
47        {
48            Some(event) => {
49                tracing::info!("Received Bitcoin syncer event: {:?}", event);
50
51                tracing::debug!("TXSENDER: Event: {:?}", event);
52                match event {
53                    BitcoinSyncerEvent::NewBlock(block_id) => {
54                        let block_height = self
55                            .inner
56                            .db
57                            .get_block_info_from_id(Some(&mut dbtx), block_id)
58                            .await?
59                            .ok_or_else(|| {
60                                BridgeError::Eyre(eyre::eyre!("Block not found in TxSenderTask"))
61                            })?
62                            .1;
63                        tracing::info!(
64                            height = self.current_tip_height,
65                            block_id = %block_id,
66                            "Block mined, confirming transactions..."
67                        );
68
69                        self.inner
70                            .db
71                            .confirm_transactions(&mut dbtx, block_id)
72                            .await?;
73
74                        self.inner.db.commit_transaction(dbtx).await?;
75                        // update after db commit
76                        self.current_tip_height = block_height;
77                        true
78                    }
79                    BitcoinSyncerEvent::ReorgedBlock(block_id) => {
80                        let height = self
81                            .inner
82                            .db
83                            .get_block_info_from_id(Some(&mut dbtx), block_id)
84                            .await?
85                            .ok_or_else(|| {
86                                BridgeError::Eyre(eyre::eyre!("Block not found in TxSenderTask"))
87                            })?
88                            .1;
89                        tracing::info!(
90                            height = height,
91                            block_id = %block_id,
92                            "Reorged happened, unconfirming transactions..."
93                        );
94
95                        self.inner
96                            .db
97                            .unconfirm_transactions(&mut dbtx, block_id)
98                            .await?;
99
100                        self.inner.db.commit_transaction(dbtx).await?;
101                        true
102                    }
103                }
104            }
105            None => false,
106        };
107
108        // If there is a block update, it is possible that there are more.
109        // Before sending, fetch all events and process them without waiting.
110        if is_block_update {
111            return Ok(true);
112        }
113
114        tracing::debug!("TXSENDER: Getting fee rate");
115        let fee_rate_result = self.inner.get_fee_rate().await;
116        tracing::debug!("TXSENDER: Fee rate result: {:?}", fee_rate_result);
117        let fee_rate = fee_rate_result?;
118
119        self.inner
120            .try_to_send_unconfirmed_txs(
121                fee_rate,
122                self.current_tip_height,
123                self.last_processed_tip_height != self.current_tip_height,
124            )
125            .await?;
126        self.last_processed_tip_height = self.current_tip_height;
127
128        Ok(false)
129    }
130}