clementine_core/tx_sender/
task.rs1use super::TxSender;
12use crate::errors::ResultExt;
13use crate::task::{IgnoreError, TaskVariant, WithDelay};
14use crate::{
15 bitcoin_syncer::BitcoinSyncerEvent,
16 database::Database,
17 errors::BridgeError,
18 task::{IntoTask, Task, TaskExt},
19};
20use std::time::Duration;
21use tonic::async_trait;
22
23const POLL_DELAY: Duration = if cfg!(test) {
24 Duration::from_millis(250)
25} else {
26 Duration::from_secs(30)
27};
28
29#[derive(Debug)]
30pub struct TxSenderTask {
31 db: Database,
32 current_tip_height: u32,
33 last_processed_tip_height: u32,
34 inner: TxSender,
35}
36
37#[async_trait]
38impl Task for TxSenderTask {
39 type Output = bool;
40 const VARIANT: TaskVariant = TaskVariant::TxSender;
41
42 #[tracing::instrument(skip(self), name = "tx_sender_task")]
43 async fn run_once(&mut self) -> std::result::Result<Self::Output, BridgeError> {
44 let mut dbtx = self.db.begin_transaction().await.map_to_eyre()?;
45
46 let is_block_update = async {
47 let Some(event) = self
48 .db
49 .fetch_next_bitcoin_syncer_evt(&mut dbtx, &self.inner.btc_syncer_consumer_id)
50 .await?
51 else {
52 return Ok(false);
53 };
54 tracing::info!("Received Bitcoin syncer event: {:?}", event);
55
56 tracing::debug!("TXSENDER: Event: {:?}", event);
57 Ok::<_, BridgeError>(match event {
58 BitcoinSyncerEvent::NewBlock(block_id) => {
59 let block_height = self
60 .db
61 .get_block_info_from_id(Some(&mut dbtx), block_id)
62 .await?
63 .ok_or(eyre::eyre!("Block not found in TxSenderTask"))?
64 .1;
65 tracing::info!(
66 height = self.current_tip_height,
67 block_id = %block_id,
68 "Block mined, confirming transactions..."
69 );
70
71 self.db.confirm_transactions(&mut dbtx, block_id).await?;
72
73 dbtx.commit().await?;
74 self.current_tip_height = block_height;
76 true
77 }
78 BitcoinSyncerEvent::ReorgedBlock(block_id) => {
79 let height = self
80 .db
81 .get_block_info_from_id(Some(&mut dbtx), block_id)
82 .await?
83 .ok_or(eyre::eyre!("Block not found in TxSenderTask"))?
84 .1;
85 tracing::info!(
86 height = height,
87 block_id = %block_id,
88 "Reorged happened, unconfirming transactions..."
89 );
90
91 self.db.unconfirm_transactions(&mut dbtx, block_id).await?;
92
93 dbtx.commit().await?;
94 true
95 }
96 })
97 }
98 .await?;
99
100 if is_block_update {
103 return Ok(true);
104 }
105
106 tracing::debug!("TXSENDER: Getting fee rate");
107 let fee_rate_result = self.inner.get_fee_rate().await;
108 tracing::debug!("TXSENDER: Fee rate result: {:?}", fee_rate_result);
109 let fee_rate = fee_rate_result?;
110
111 self.inner
112 .try_to_send_unconfirmed_txs(
113 fee_rate,
114 self.current_tip_height,
115 self.last_processed_tip_height != self.current_tip_height,
116 )
117 .await?;
118 self.last_processed_tip_height = self.current_tip_height;
119
120 Ok(false)
121 }
122}
123
124impl IntoTask for TxSender {
125 type Task = WithDelay<IgnoreError<TxSenderTask>>;
126
127 fn into_task(self) -> Self::Task {
128 TxSenderTask {
129 db: self.db.clone(),
130 current_tip_height: 0,
131 last_processed_tip_height: 0,
132 inner: self,
133 }
134 .ignore_error()
135 .with_delay(POLL_DELAY)
136 }
137}