clementine_tx_sender/
task.rs

1use crate::config::TxSenderConfig;
2use crate::TxSender;
3use clementine_errors::BridgeError;
4use std::time::Duration;
5
6#[derive(Debug)]
7pub struct TxSenderTaskInternal {
8    pub current_tip_height: u32,
9    pub last_processed_tip_height: u32,
10    pub inner: TxSender,
11}
12
13impl TxSenderTaskInternal {
14    pub fn new(inner: TxSender) -> Self {
15        Self {
16            current_tip_height: 0,
17            last_processed_tip_height: 0,
18            inner,
19        }
20    }
21
22    #[tracing::instrument(skip(self), name = "tx_sender_task")]
23    pub async fn run_once(&mut self) -> Result<bool, BridgeError> {
24        // Get current tip height from Bitcoin RPC, then sync confirmations/spent tracking.
25        self.current_tip_height = self
26            .inner
27            .rpc
28            .get_current_chain_height()
29            .await
30            .map_err(|e| BridgeError::Eyre(eyre::eyre!(e)))?;
31
32        tracing::debug!("TXSENDER: Getting fee rate");
33        let fee_rate = self.inner.get_fee_rate().await?;
34        tracing::debug!("TXSENDER: Fee rate result: {fee_rate:?}");
35
36        #[cfg(feature = "citrea")]
37        self.inner.sync_citrea_txs(fee_rate).await?;
38        // No need for db transaction as it doesn't matter if it fails midway, we resync from rpc continuously
39        self.inner
40            .sync_transaction_confirmations_via_rpc(None, self.current_tip_height)
41            .await?;
42
43        self.inner
44            .try_to_send_unconfirmed_txs(
45                fee_rate,
46                self.current_tip_height,
47                self.last_processed_tip_height != self.current_tip_height,
48            )
49            .await?;
50        self.last_processed_tip_height = self.current_tip_height;
51
52        self.inner
53            .db
54            .update_synced_height(self.current_tip_height)
55            .await?;
56
57        Ok(false)
58    }
59}
60
61/// Spawns a tokio task that runs txsender indefinitely.
62///
63/// This is a standalone loop helper with no dependency on the `clementine-core` task framework.
64/// Callers that own the txsender schema should run migrations before spawning it.
65pub fn spawn_txsender_loop(config: TxSenderConfig) -> tokio::task::JoinHandle<()> {
66    tokio::spawn(async move {
67        let poll_delay = Duration::from_millis(config.poll_delay_ms);
68        #[cfg(feature = "json-rpc")]
69        let mut jsonrpc_handle: Option<crate::jsonrpc::server::TxSenderJsonRpcServer> = None;
70
71        loop {
72            let init_res: Result<TxSender, BridgeError> = async {
73                let tx_sender = TxSender::new(config.clone()).await?;
74
75                #[cfg(feature = "json-rpc")]
76                if let Some(rpc_cfg) = config.jsonrpc.clone() {
77                    // If we previously had a server, stop it before re-binding.
78                    if let Some(old) = jsonrpc_handle.take() {
79                        let handle = old.stop();
80                        let _ = handle.stop();
81                    }
82
83                    let bind: std::net::IpAddr = rpc_cfg.bind.parse().map_err(|e| {
84                        BridgeError::ConfigError(format!("Invalid TX_SENDER_JSONRPC_BIND: {e}"))
85                    })?;
86                    let addr = std::net::SocketAddr::new(bind, rpc_cfg.port);
87
88                    let server =
89                        crate::jsonrpc::server::start_jsonrpc_server(tx_sender.client(), addr)
90                            .await?;
91                    jsonrpc_handle = Some(server);
92                }
93
94                Ok(tx_sender)
95            }
96            .await;
97
98            let tx_sender = match init_res {
99                Ok(x) => x,
100                Err(e) => {
101                    tracing::error!("txsender init failed (will retry): {e:?}");
102                    tokio::time::sleep(Duration::from_secs(5)).await;
103                    continue;
104                }
105            };
106
107            let mut internal = TxSenderTaskInternal::new(tx_sender);
108            loop {
109                if let Err(e) = internal.run_once().await {
110                    tracing::error!("txsender loop iteration failed: {e:?}");
111                }
112                tokio::time::sleep(poll_delay).await;
113            }
114        }
115    })
116}
117
118/// Test utility: pick a free localhost port, enable JSON-RPC, and spawn txsender loop.
119///
120/// Returns `(jsonrpc_addr, join_handle)`.
121#[cfg(all(feature = "testing", feature = "json-rpc"))]
122pub fn spawn_txsender_loop_with_free_localhost_jsonrpc_port(
123    mut config: TxSenderConfig,
124) -> (std::net::SocketAddr, tokio::task::JoinHandle<()>) {
125    use crate::test_utils::get_available_port;
126    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
127
128    let port = get_available_port();
129
130    let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port);
131    config.jsonrpc = Some(crate::config::TxSenderJsonRpcConfig {
132        bind: "127.0.0.1".to_string(),
133        port,
134    });
135
136    let handle = spawn_txsender_loop(config);
137    (addr, handle)
138}