clementine_tx_sender/
task.rs1use crate::{TxSender, TxSenderDatabase, TxSenderSigner, TxSenderTxBuilder};
2use clementine_errors::BridgeError;
3use clementine_primitives::BitcoinSyncerEvent;
4use std::time::Duration;
5
6pub const POLL_DELAY: Duration = if cfg!(test) {
7 Duration::from_millis(250)
8} else {
9 Duration::from_secs(30)
10};
11
12#[derive(Debug)]
13pub struct TxSenderTaskInternal<S, D, B>
14where
15 S: TxSenderSigner + 'static,
16 D: TxSenderDatabase + Clone + 'static,
17 B: TxSenderTxBuilder + 'static,
18{
19 pub current_tip_height: u32,
20 pub last_processed_tip_height: u32,
21 pub inner: TxSender<S, D, B>,
22}
23
24impl<S, D, B> TxSenderTaskInternal<S, D, B>
25where
26 S: TxSenderSigner + 'static,
27 D: TxSenderDatabase + Clone + 'static,
28 B: TxSenderTxBuilder + 'static,
29{
30 pub fn new(inner: TxSender<S, D, B>) -> Self {
31 Self {
32 current_tip_height: 0,
33 last_processed_tip_height: 0,
34 inner,
35 }
36 }
37
38 #[tracing::instrument(skip(self), name = "tx_sender_task")]
39 pub async fn run_once(&mut self) -> Result<bool, BridgeError> {
40 let mut dbtx = self.inner.db.begin_transaction().await?;
41
42 let is_block_update = match self
43 .inner
44 .db
45 .fetch_next_bitcoin_syncer_evt(&mut dbtx, &self.inner.btc_syncer_consumer_id)
46 .await?
47 {
48 Some(event) => {
49 tracing::info!("Received Bitcoin syncer event: {:?}", event);
50
51 tracing::debug!("TXSENDER: Event: {:?}", event);
52 match event {
53 BitcoinSyncerEvent::NewBlock(block_id) => {
54 let block_height = self
55 .inner
56 .db
57 .get_block_info_from_id(Some(&mut dbtx), block_id)
58 .await?
59 .ok_or_else(|| {
60 BridgeError::Eyre(eyre::eyre!("Block not found in TxSenderTask"))
61 })?
62 .1;
63 tracing::info!(
64 height = self.current_tip_height,
65 block_id = %block_id,
66 "Block mined, confirming transactions..."
67 );
68
69 self.inner
70 .db
71 .confirm_transactions(&mut dbtx, block_id)
72 .await?;
73
74 self.inner.db.commit_transaction(dbtx).await?;
75 self.current_tip_height = block_height;
77 true
78 }
79 BitcoinSyncerEvent::ReorgedBlock(block_id) => {
80 let height = self
81 .inner
82 .db
83 .get_block_info_from_id(Some(&mut dbtx), block_id)
84 .await?
85 .ok_or_else(|| {
86 BridgeError::Eyre(eyre::eyre!("Block not found in TxSenderTask"))
87 })?
88 .1;
89 tracing::info!(
90 height = height,
91 block_id = %block_id,
92 "Reorged happened, unconfirming transactions..."
93 );
94
95 self.inner
96 .db
97 .unconfirm_transactions(&mut dbtx, block_id)
98 .await?;
99
100 self.inner.db.commit_transaction(dbtx).await?;
101 true
102 }
103 }
104 }
105 None => false,
106 };
107
108 if is_block_update {
111 return Ok(true);
112 }
113
114 tracing::debug!("TXSENDER: Getting fee rate");
115 let fee_rate_result = self.inner.get_fee_rate().await;
116 tracing::debug!("TXSENDER: Fee rate result: {:?}", fee_rate_result);
117 let fee_rate = fee_rate_result?;
118
119 self.inner
120 .try_to_send_unconfirmed_txs(
121 fee_rate,
122 self.current_tip_height,
123 self.last_processed_tip_height != self.current_tip_height,
124 )
125 .await?;
126 self.last_processed_tip_height = self.current_tip_height;
127
128 Ok(false)
129 }
130}