clementine_tx_sender/
task.rs1use crate::config::TxSenderConfig;
2use crate::TxSender;
3use clementine_errors::BridgeError;
4use std::time::Duration;
5
6#[derive(Debug)]
7pub struct TxSenderTaskInternal {
8 pub current_tip_height: u32,
9 pub last_processed_tip_height: u32,
10 pub inner: TxSender,
11}
12
13impl TxSenderTaskInternal {
14 pub fn new(inner: TxSender) -> Self {
15 Self {
16 current_tip_height: 0,
17 last_processed_tip_height: 0,
18 inner,
19 }
20 }
21
22 #[tracing::instrument(skip(self), name = "tx_sender_task")]
23 pub async fn run_once(&mut self) -> Result<bool, BridgeError> {
24 self.current_tip_height = self
26 .inner
27 .rpc
28 .get_current_chain_height()
29 .await
30 .map_err(|e| BridgeError::Eyre(eyre::eyre!(e)))?;
31
32 tracing::debug!("TXSENDER: Getting fee rate");
33 let fee_rate = self.inner.get_fee_rate().await?;
34 tracing::debug!("TXSENDER: Fee rate result: {fee_rate:?}");
35
36 #[cfg(feature = "citrea")]
37 self.inner.sync_citrea_txs(fee_rate).await?;
38 self.inner
40 .sync_transaction_confirmations_via_rpc(None, self.current_tip_height)
41 .await?;
42
43 self.inner
44 .try_to_send_unconfirmed_txs(
45 fee_rate,
46 self.current_tip_height,
47 self.last_processed_tip_height != self.current_tip_height,
48 )
49 .await?;
50 self.last_processed_tip_height = self.current_tip_height;
51
52 self.inner
53 .db
54 .update_synced_height(self.current_tip_height)
55 .await?;
56
57 Ok(false)
58 }
59}
60
61pub fn spawn_txsender_loop(config: TxSenderConfig) -> tokio::task::JoinHandle<()> {
66 tokio::spawn(async move {
67 let poll_delay = Duration::from_millis(config.poll_delay_ms);
68 #[cfg(feature = "json-rpc")]
69 let mut jsonrpc_handle: Option<crate::jsonrpc::server::TxSenderJsonRpcServer> = None;
70
71 loop {
72 let init_res: Result<TxSender, BridgeError> = async {
73 let tx_sender = TxSender::new(config.clone()).await?;
74
75 #[cfg(feature = "json-rpc")]
76 if let Some(rpc_cfg) = config.jsonrpc.clone() {
77 if let Some(old) = jsonrpc_handle.take() {
79 let handle = old.stop();
80 let _ = handle.stop();
81 }
82
83 let bind: std::net::IpAddr = rpc_cfg.bind.parse().map_err(|e| {
84 BridgeError::ConfigError(format!("Invalid TX_SENDER_JSONRPC_BIND: {e}"))
85 })?;
86 let addr = std::net::SocketAddr::new(bind, rpc_cfg.port);
87
88 let server =
89 crate::jsonrpc::server::start_jsonrpc_server(tx_sender.client(), addr)
90 .await?;
91 jsonrpc_handle = Some(server);
92 }
93
94 Ok(tx_sender)
95 }
96 .await;
97
98 let tx_sender = match init_res {
99 Ok(x) => x,
100 Err(e) => {
101 tracing::error!("txsender init failed (will retry): {e:?}");
102 tokio::time::sleep(Duration::from_secs(5)).await;
103 continue;
104 }
105 };
106
107 let mut internal = TxSenderTaskInternal::new(tx_sender);
108 loop {
109 if let Err(e) = internal.run_once().await {
110 tracing::error!("txsender loop iteration failed: {e:?}");
111 }
112 tokio::time::sleep(poll_delay).await;
113 }
114 }
115 })
116}
117
118#[cfg(all(feature = "testing", feature = "json-rpc"))]
122pub fn spawn_txsender_loop_with_free_localhost_jsonrpc_port(
123 mut config: TxSenderConfig,
124) -> (std::net::SocketAddr, tokio::task::JoinHandle<()>) {
125 use crate::test_utils::get_available_port;
126 use std::net::{IpAddr, Ipv4Addr, SocketAddr};
127
128 let port = get_available_port();
129
130 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
131 config.jsonrpc = Some(crate::config::TxSenderJsonRpcConfig {
132 bind: "127.0.0.1".to_string(),
133 port,
134 });
135
136 let handle = spawn_txsender_loop(config);
137 (addr, handle)
138}