clementine_tx_sender/db/
tx_sender.rs

1//! SQLx queries for tx-sender tables.
2
3use super::wrapper::TxidDB;
4use super::{TxSenderDb, TxSenderDbTx};
5use crate::txsender_execute_query_with_tx;
6use bitcoin::consensus::{deserialize, serialize};
7use bitcoin::{Amount, OutPoint, Transaction, Txid};
8use clementine_errors::BridgeError;
9use clementine_primitives::FeeRateKvb;
10use clementine_utils::{FeePayingType, RbfSigningInfo, TxMetadata};
11use eyre::{Context, OptionExt};
12use sqlx::Executor;
13use std::collections::HashMap;
14
15use crate::{ActivatedWithOutpoint, ActivatedWithTxid};
16
17impl TxSenderDb {
18    /// Saves a fee payer transaction to the database.
19    ///
20    /// # Arguments
21    /// * `bumped_id` - The id of the transaction funded by this fee payer.
22    /// * `fee_payer_txid` - The txid of the fee payer transaction.
23    /// * `vout` - The output index of the fee payer UTXO.
24    /// * `amount` - The amount in satoshis.
25    /// * `replacement_of_id` - The fee payer UTXO this row replaces, if any.
26    #[allow(clippy::too_many_arguments)]
27    pub async fn save_fee_payer_tx(
28        &self,
29        tx: Option<TxSenderDbTx<'_>>,
30        bumped_id: u32,
31        fee_payer_txid: Txid,
32        vout: u32,
33        amount: Amount,
34        replacement_of_id: Option<u32>,
35    ) -> Result<(), BridgeError> {
36        let query = sqlx::query(
37            "INSERT INTO tx_sender_fee_payer_utxos (bumped_id, fee_payer_txid, vout, amount, replacement_of_id)
38             VALUES ($1, $2, $3, $4, $5)",
39        )
40        .bind(i32::try_from(bumped_id).wrap_err("Failed to convert bumped id to i32")?)
41        .bind(TxidDB(fee_payer_txid))
42        .bind(i32::try_from(vout).wrap_err("Failed to convert vout to i32")?)
43        .bind(i64::try_from(amount.to_sat()).wrap_err("Failed to convert amount to i64")?)
44        .bind(
45            replacement_of_id
46                .map(i32::try_from)
47                .transpose()
48                .wrap_err("Failed to convert replacement of id to i32")?,
49        );
50
51        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
52        Ok(())
53    }
54
55    /// Returns all unconfirmed fee payer UTXOs.
56    ///
57    /// UTXOs whose replacement chain already has a confirmed member are excluded. If no
58    /// replacement in the chain is confirmed, all unconfirmed replacements are returned.
59    ///
60    /// # Returns
61    ///
62    /// A vector of fee payer UTXO details:
63    /// - [`u32`]: id of the fee payer UTXO row.
64    /// - [`u32`]: id of the bumped transaction.
65    /// - [`Txid`]: txid of the fee payer transaction.
66    /// - [`u32`]: output index of the UTXO.
67    /// - [`Amount`]: amount in satoshis.
68    /// - [`Option<u32>`]: replaced fee payer UTXO id, if this is a replacement.
69    pub async fn get_all_unconfirmed_fee_payer_txs(
70        &self,
71        tx: Option<TxSenderDbTx<'_>>,
72    ) -> Result<Vec<(u32, u32, Txid, u32, Amount, Option<u32>)>, BridgeError> {
73        let query = sqlx::query_as::<_, (i32, i32, TxidDB, i32, i64, Option<i32>)>(
74            "
75            SELECT fpu.id, fpu.bumped_id, fpu.fee_payer_txid, fpu.vout, fpu.amount, fpu.replacement_of_id
76            FROM tx_sender_fee_payer_utxos fpu
77            WHERE fpu.seen_at_height IS NULL
78              AND fpu.is_evicted = false
79              AND NOT EXISTS (
80                  SELECT 1
81                  FROM tx_sender_fee_payer_utxos x
82                  WHERE COALESCE(x.replacement_of_id, x.id)
83                        = COALESCE(fpu.replacement_of_id, fpu.id)
84                    AND x.seen_at_height IS NOT NULL
85              )
86            ",
87        );
88
89        let results: Vec<(i32, i32, TxidDB, i32, i64, Option<i32>)> =
90            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
91
92        results
93            .iter()
94            .map(
95                |(id, bumped_id, fee_payer_txid, vout, amount, replacement_of_id)| {
96                    Ok((
97                        u32::try_from(*id).wrap_err("Failed to convert id to u32")?,
98                        u32::try_from(*bumped_id).wrap_err("Failed to convert bumped id to u32")?,
99                        fee_payer_txid.0,
100                        u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
101                        Amount::from_sat(
102                            u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
103                        ),
104                        replacement_of_id
105                            .map(u32::try_from)
106                            .transpose()
107                            .wrap_err("Failed to convert replacement of id to u32")?,
108                    ))
109                },
110            )
111            .collect::<Result<Vec<_>, BridgeError>>()
112    }
113
114    /// Returns unconfirmed fee payer UTXOs for one try-to-send transaction.
115    ///
116    /// UTXOs whose replacement chain already has a confirmed member are excluded. If no
117    /// replacement in the chain is confirmed, all unconfirmed replacements are returned.
118    ///
119    /// # Arguments
120    /// * `bumped_id` - The id of the transaction funded by the fee payer UTXOs.
121    ///
122    /// # Returns
123    ///
124    /// A vector of fee payer UTXO details:
125    /// - [`u32`]: id of the fee payer UTXO row.
126    /// - [`Txid`]: txid of the fee payer transaction.
127    /// - [`u32`]: output index of the UTXO.
128    /// - [`Amount`]: amount in satoshis.
129    pub async fn get_unconfirmed_fee_payer_txs(
130        &self,
131        tx: Option<TxSenderDbTx<'_>>,
132        bumped_id: u32,
133    ) -> Result<Vec<(u32, Txid, u32, Amount)>, BridgeError> {
134        let query = sqlx::query_as::<_, (i32, TxidDB, i32, i64)>(
135            "
136            SELECT fpu.id, fpu.fee_payer_txid, fpu.vout, fpu.amount
137            FROM tx_sender_fee_payer_utxos fpu
138            WHERE fpu.bumped_id = $1
139              AND fpu.seen_at_height IS NULL
140              AND fpu.is_evicted = false
141              AND NOT EXISTS (
142                  SELECT 1
143                  FROM tx_sender_fee_payer_utxos x
144                  WHERE COALESCE(x.replacement_of_id, x.id)
145                        = COALESCE(fpu.replacement_of_id, fpu.id)
146                    AND x.seen_at_height IS NOT NULL
147              )
148            ",
149        )
150        .bind(i32::try_from(bumped_id).wrap_err("Failed to convert bumped id to i32")?);
151
152        let results: Vec<(i32, TxidDB, i32, i64)> =
153            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
154
155        results
156            .iter()
157            .map(|(id, fee_payer_txid, vout, amount)| {
158                Ok((
159                    u32::try_from(*id).wrap_err("Failed to convert id to u32")?,
160                    fee_payer_txid.0,
161                    u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
162                    Amount::from_sat(
163                        u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
164                    ),
165                ))
166            })
167            .collect::<Result<Vec<_>, BridgeError>>()
168    }
169
170    /// Marks a fee payer UTXO and all of its replacements as evicted.
171    ///
172    /// Evicted fee payer UTXOs are no longer selected for bumps, because their wallet
173    /// inputs may already have been reused elsewhere.
174    pub async fn mark_fee_payer_utxo_as_evicted(
175        &self,
176        tx: Option<TxSenderDbTx<'_>>,
177        id: u32,
178    ) -> Result<(), BridgeError> {
179        let query = sqlx::query(
180            "UPDATE tx_sender_fee_payer_utxos
181                SET is_evicted = true
182                WHERE id = $1
183                OR replacement_of_id = $1",
184        )
185        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
186
187        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
188        Ok(())
189    }
190
191    pub async fn get_confirmed_fee_payer_utxos(
192        &self,
193        tx: Option<TxSenderDbTx<'_>>,
194        id: u32,
195    ) -> Result<Vec<(Txid, u32, Amount)>, BridgeError> {
196        let query = sqlx::query_as::<_, (TxidDB, i32, i64)>(
197            "SELECT fee_payer_txid, vout, amount
198             FROM tx_sender_fee_payer_utxos fpu
199             WHERE fpu.bumped_id = $1 AND fpu.seen_at_height IS NOT NULL",
200        )
201        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
202
203        let results: Vec<(TxidDB, i32, i64)> =
204            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
205
206        results
207            .iter()
208            .map(|(fee_payer_txid, vout, amount)| {
209                Ok((
210                    fee_payer_txid.0,
211                    u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
212                    Amount::from_sat(
213                        u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
214                    ),
215                ))
216            })
217            .collect::<Result<Vec<_>, BridgeError>>()
218    }
219
220    /// Returns the tx-sender row id for `txid` if it already exists.
221    ///
222    /// This is used before inserting to avoid adding duplicate transactions to the queue.
223    pub async fn check_if_tx_exists_on_txsender(
224        &self,
225        tx: Option<TxSenderDbTx<'_>>,
226        txid: Txid,
227    ) -> Result<Option<u32>, BridgeError> {
228        let query = sqlx::query_as::<_, (i32,)>(
229            "SELECT id FROM tx_sender_try_to_send_txs WHERE txid = $1 LIMIT 1",
230        )
231        .bind(TxidDB(txid));
232
233        let result: Option<(i32,)> =
234            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_optional)?;
235
236        Ok(match result {
237            Some((id,)) => Some(u32::try_from(id).wrap_err("Failed to convert id to u32")?),
238            None => None,
239        })
240    }
241
242    pub async fn save_tx(
243        &self,
244        tx: TxSenderDbTx<'_>,
245        tx_metadata: Option<TxMetadata>,
246        raw_tx: &Transaction,
247        fee_paying_type: FeePayingType,
248        txid: Txid,
249        rbf_signing_info: Option<RbfSigningInfo>,
250    ) -> Result<u32, BridgeError> {
251        let query = sqlx::query_scalar(
252            r#"
253            INSERT INTO tx_sender_try_to_send_txs
254            (raw_tx, fee_paying_type, tx_metadata, txid, rbf_signing_info)
255            VALUES ($1, $2::fee_paying_type, $3, $4, $5)
256            ON CONFLICT (txid)
257            DO UPDATE SET txid = EXCLUDED.txid
258            RETURNING id
259            "#,
260        )
261        .bind(serialize(raw_tx))
262        .bind(fee_paying_type)
263        .bind(serde_json::to_string(&tx_metadata).wrap_err("Failed to encode tx_metadata to JSON")?)
264        .bind(TxidDB(txid))
265        .bind(
266            serde_json::to_string(&rbf_signing_info)
267                .wrap_err("Failed to encode rbf_signing_info to JSON")?,
268        );
269
270        let id: i32 = query.fetch_one(&mut **tx).await?;
271        u32::try_from(id)
272            .wrap_err("Failed to convert id to u32")
273            .map_err(Into::into)
274    }
275
276    pub async fn save_rbf_txid(
277        &self,
278        tx: Option<TxSenderDbTx<'_>>,
279        id: u32,
280        txid: Txid,
281    ) -> Result<(), BridgeError> {
282        let query = sqlx::query("INSERT INTO tx_sender_rbf_txids (id, txid) VALUES ($1, $2)")
283            .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?)
284            .bind(TxidDB(txid));
285
286        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
287        Ok(())
288    }
289
290    pub async fn get_last_rbf_txid(
291        &self,
292        tx: Option<TxSenderDbTx<'_>>,
293        id: u32,
294    ) -> Result<Option<Txid>, BridgeError> {
295        let query = sqlx::query_as::<_, (TxidDB,)>(
296            "SELECT txid FROM tx_sender_rbf_txids WHERE id = $1 ORDER BY insertion_order DESC LIMIT 1",
297        )
298        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
299
300        let result: Option<(TxidDB,)> =
301            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_optional)?;
302        Ok(result.map(|(txid,)| txid.0))
303    }
304
305    pub async fn list_rbf_txids_for_id(
306        &self,
307        tx: Option<TxSenderDbTx<'_>>,
308        id: u32,
309    ) -> Result<Vec<Txid>, BridgeError> {
310        let query = sqlx::query_as::<_, (TxidDB,)>(
311            "SELECT txid FROM tx_sender_rbf_txids WHERE id = $1 ORDER BY insertion_order DESC",
312        )
313        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
314
315        let results: Vec<(TxidDB,)> =
316            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
317        Ok(results.into_iter().map(|(txid,)| txid.0).collect())
318    }
319
320    pub async fn save_cancelled_outpoint(
321        &self,
322        tx: TxSenderDbTx<'_>,
323        cancelled_id: u32,
324        outpoint: OutPoint,
325    ) -> Result<(), BridgeError> {
326        let query = sqlx::query(
327            "INSERT INTO tx_sender_cancel_try_to_send_outpoints (cancelled_id, txid, vout) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
328        )
329        .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled id to i32")?)
330        .bind(TxidDB(outpoint.txid))
331        .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?);
332
333        query.execute(&mut **tx).await?;
334        Ok(())
335    }
336
337    pub async fn save_cancelled_txid(
338        &self,
339        tx: TxSenderDbTx<'_>,
340        cancelled_id: u32,
341        txid: Txid,
342    ) -> Result<(), BridgeError> {
343        let query = sqlx::query(
344            "INSERT INTO tx_sender_cancel_try_to_send_txids (cancelled_id, txid) VALUES ($1, $2) ON CONFLICT DO NOTHING",
345        )
346        .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled id to i32")?)
347        .bind(TxidDB(txid));
348
349        query.execute(&mut **tx).await?;
350        Ok(())
351    }
352
353    pub async fn save_activated_txid(
354        &self,
355        tx: TxSenderDbTx<'_>,
356        activated_id: u32,
357        prerequisite_tx: &ActivatedWithTxid,
358    ) -> Result<(), BridgeError> {
359        let query = sqlx::query(
360            "INSERT INTO tx_sender_activate_try_to_send_txids (activated_id, txid, timelock) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
361        )
362        .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated id to i32")?)
363        .bind(TxidDB(prerequisite_tx.txid))
364        .bind(i32::try_from(prerequisite_tx.relative_block_height).wrap_err("Failed to convert relative block height to i32")?);
365
366        query.execute(&mut **tx).await?;
367        Ok(())
368    }
369
370    pub async fn save_activated_outpoint(
371        &self,
372        tx: TxSenderDbTx<'_>,
373        activated_id: u32,
374        activated_outpoint: &ActivatedWithOutpoint,
375    ) -> Result<(), BridgeError> {
376        let query = sqlx::query(
377            "INSERT INTO tx_sender_activate_try_to_send_outpoints (activated_id, txid, vout, timelock) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING",
378        )
379        .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated id to i32")?)
380        .bind(TxidDB(activated_outpoint.outpoint.txid))
381        .bind(i32::try_from(activated_outpoint.outpoint.vout).wrap_err("Failed to convert vout to i32")?)
382        .bind(i32::try_from(activated_outpoint.relative_block_height).wrap_err("Failed to convert relative block height to i32")?);
383
384        query.execute(&mut **tx).await?;
385        Ok(())
386    }
387
388    /// Returns unconfirmed try-to-send transactions that satisfy all queue conditions.
389    ///
390    /// A transaction is sendable when:
391    /// - all activation dependencies have been seen and their relative block timelocks passed;
392    /// - zero-timelock txid activations are either seen on-chain or currently in mempool;
393    /// - no cancellation dependency has been seen;
394    /// - the transaction itself has not been seen on-chain;
395    /// - its previous effective fee rate is lower than `fee_rate`, or it has never been sent.
396    ///
397    /// Passing a very high `fee_rate` is used by callers to retrieve all active transactions
398    /// after a new block, even when the market fee did not increase.
399    ///
400    /// # Returns
401    ///
402    /// A vector of tx-sender database ids that are ready to send or bump.
403    pub async fn get_sendable_txs(
404        &self,
405        tx: Option<TxSenderDbTx<'_>>,
406        fee_rate: FeeRateKvb,
407        current_tip_height: u32,
408    ) -> Result<Vec<u32>, BridgeError> {
409        let select_query = sqlx::query_as::<_, (i32,)>(
410            "WITH
411                non_active_txs AS (
412                    SELECT DISTINCT
413                        activate_txid.activated_id AS tx_id
414                    FROM
415                        tx_sender_activate_try_to_send_txids AS activate_txid
416                    WHERE
417                        (
418                            activate_txid.timelock > 0
419                            AND (
420                                activate_txid.seen_at_height IS NULL
421                                OR (activate_txid.seen_at_height::bigint + activate_txid.timelock > $2::bigint)
422                            )
423                        )
424                        OR (
425                            activate_txid.timelock = 0
426                            AND activate_txid.seen_at_height IS NULL
427                            AND activate_txid.in_mempool IS NOT TRUE
428                        )
429
430                    UNION
431
432                    SELECT DISTINCT
433                        activate_outpoint.activated_id AS tx_id
434                    FROM
435                        tx_sender_activate_try_to_send_outpoints AS activate_outpoint
436                    WHERE
437                        activate_outpoint.seen_at_height IS NULL
438                        OR (activate_outpoint.seen_at_height::bigint + activate_outpoint.timelock > $2::bigint)
439                ),
440
441                cancelled_txs AS (
442                    SELECT DISTINCT
443                        cancelled_id AS tx_id
444                    FROM
445                        tx_sender_cancel_try_to_send_outpoints
446                    WHERE
447                        seen_at_height IS NOT NULL
448
449                    UNION
450
451                    SELECT DISTINCT
452                        cancelled_id AS tx_id
453                    FROM
454                        tx_sender_cancel_try_to_send_txids
455                    WHERE
456                        seen_at_height IS NOT NULL
457                )
458
459                SELECT
460                    txs.id
461                FROM
462                    tx_sender_try_to_send_txs AS txs
463                WHERE
464                    txs.id NOT IN (SELECT tx_id FROM non_active_txs)
465                    AND txs.id NOT IN (SELECT tx_id FROM cancelled_txs)
466                    AND txs.seen_at_height IS NULL
467                    AND (txs.effective_fee_rate IS NULL OR txs.effective_fee_rate < $1);",
468        )
469        .bind(
470            i64::try_from(fee_rate.to_sat_per_kvb()).wrap_err("Failed to convert fee rate to i64")?,
471        )
472        .bind(i32::try_from(current_tip_height).wrap_err("Failed to convert current tip height to i32")?);
473
474        let results = txsender_execute_query_with_tx!(&self.pool, tx, select_query, fetch_all)?;
475
476        let txs = results
477            .into_iter()
478            .map(|(id,)| u32::try_from(id))
479            .collect::<Result<Vec<_>, _>>()
480            .wrap_err("Failed to convert id to u32")?;
481
482        Ok(txs)
483    }
484
485    /// Returns the effective fee rate and block height from the last actual fee bump.
486    ///
487    /// Returns `(None, None)` if no effective fee rate has been recorded yet.
488    pub async fn get_effective_fee_rate(
489        &self,
490        tx: Option<TxSenderDbTx<'_>>,
491        id: u32,
492    ) -> Result<(Option<FeeRateKvb>, Option<u32>), BridgeError> {
493        let query = sqlx::query_as::<_, (Option<i64>, Option<i32>)>(
494            "SELECT effective_fee_rate, last_bump_block_height FROM tx_sender_try_to_send_txs WHERE id = $1",
495        )
496        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
497
498        let result = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_optional)?;
499
500        match result {
501            Some((Some(rate), block_height)) => Ok((
502                Some(FeeRateKvb::from_sat_per_kvb(
503                    u64::try_from(rate).wrap_err("Failed to convert effective fee rate to u64")?,
504                )),
505                block_height.map(|h| h as u32),
506            )),
507            Some((None, _)) | None => Ok((None, None)),
508        }
509    }
510
511    /// Updates the effective fee rate and last bump block height for a transaction.
512    ///
513    /// The row is updated only when the fee rate changes, or when the previous fee rate
514    /// is `NULL`. This preserves `last_bump_block_height` across retries at the same
515    /// fee rate, so the stuck-for-N-blocks counter continues from the last real bump.
516    ///
517    /// `effective_fee_rate` is stored in sat/kvB.
518    pub async fn update_effective_fee_rate(
519        &self,
520        tx: Option<TxSenderDbTx<'_>>,
521        id: u32,
522        effective_fee_rate: FeeRateKvb,
523        block_height: u32,
524    ) -> Result<(), BridgeError> {
525        let query = sqlx::query(
526            "UPDATE tx_sender_try_to_send_txs
527             SET effective_fee_rate = $1, last_bump_block_height = $2
528             WHERE id = $3 AND (effective_fee_rate IS NULL OR effective_fee_rate != $1)",
529        )
530        .bind(
531            i64::try_from(effective_fee_rate.to_sat_per_kvb())
532                .wrap_err("Failed to convert effective fee rate to i64")?,
533        )
534        .bind(i32::try_from(block_height).wrap_err("Failed to convert block_height to i32")?)
535        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
536
537        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
538        Ok(())
539    }
540
541    pub async fn get_try_to_send_tx(
542        &self,
543        tx: Option<TxSenderDbTx<'_>>,
544        id: u32,
545    ) -> Result<
546        (
547            Option<TxMetadata>,
548            Transaction,
549            FeePayingType,
550            Option<u32>,
551            Option<RbfSigningInfo>,
552        ),
553        BridgeError,
554    > {
555        let query = sqlx::query_as::<
556            _,
557            (
558                Option<String>,
559                Option<Vec<u8>>,
560                FeePayingType,
561                Option<i32>,
562                Option<String>,
563            ),
564        >(
565            "SELECT tx_metadata, raw_tx, fee_paying_type, seen_at_height, rbf_signing_info
566             FROM tx_sender_try_to_send_txs
567             WHERE id = $1 LIMIT 1",
568        )
569        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
570
571        let result = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_one)?;
572        Ok((
573            serde_json::from_str(result.0.as_deref().unwrap_or("null"))
574                .wrap_err_with(|| format!("Failed to decode tx_metadata from {:?}", result.0))?,
575            result
576                .1
577                .as_deref()
578                .map(deserialize)
579                .ok_or_eyre("Expected raw_tx to be present")?
580                .wrap_err("Failed to deserialize raw_tx")?,
581            result.2,
582            result
583                .3
584                .map(u32::try_from)
585                .transpose()
586                .wrap_err("Failed to convert seen_at_height to u32")?,
587            serde_json::from_str(result.4.as_deref().unwrap_or("null")).wrap_err_with(|| {
588                format!("Failed to decode rbf_signing_info from {:?}", result.4)
589            })?,
590        ))
591    }
592
593    /// Saves a transaction submission error to the debug table.
594    pub async fn save_tx_debug_submission_error(
595        &self,
596        tx: Option<TxSenderDbTx<'_>>,
597        tx_id: u32,
598        error_message: &str,
599    ) -> Result<(), BridgeError> {
600        let query = sqlx::query(
601            "INSERT INTO tx_sender_debug_submission_errors (tx_id, error_message) VALUES ($1, $2)",
602        )
603        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?)
604        .bind(error_message);
605
606        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
607        Ok(())
608    }
609
610    /// Updates or inserts the transaction's current debug sending state.
611    ///
612    /// This intentionally does not accept a database transaction. It is debug-only
613    /// metadata and callers should use it after the tx-sender row has been committed.
614    pub async fn update_tx_debug_sending_state(
615        &self,
616        tx_id: u32,
617        state: &str,
618        activated: bool,
619    ) -> Result<(), BridgeError> {
620        let query = sqlx::query(
621            r#"
622            INSERT INTO tx_sender_debug_sending_state
623            (tx_id, state, last_update, activated_timestamp)
624            VALUES ($1, $2, NOW(),
625                CASE
626                    WHEN $3 = TRUE THEN NOW()
627                    ELSE NULL
628                END
629            )
630            ON CONFLICT (tx_id) DO UPDATE SET
631            state = $2,
632            last_update = NOW(),
633            activated_timestamp = COALESCE(tx_sender_debug_sending_state.activated_timestamp,
634                CASE
635                    WHEN $3 = TRUE THEN NOW()
636                    ELSE NULL
637                END
638            )
639            "#,
640        )
641        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?)
642        .bind(state)
643        .bind(activated);
644
645        self.pool.execute(query).await?;
646        Ok(())
647    }
648
649    /// Returns the current debug sending state for a transaction.
650    pub async fn get_tx_debug_info(
651        &self,
652        tx: Option<TxSenderDbTx<'_>>,
653        tx_id: u32,
654    ) -> Result<Option<String>, BridgeError> {
655        let query = sqlx::query_as::<_, (Option<String>,)>(
656            r#"
657            SELECT state
658            FROM tx_sender_debug_sending_state
659            WHERE tx_id = $1
660            "#,
661        )
662        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
663
664        let result = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_optional)?;
665        match result {
666            Some((state,)) => Ok(state),
667            None => Ok(None),
668        }
669    }
670
671    /// Returns all recorded submission errors for a transaction, ordered by timestamp.
672    pub async fn get_tx_debug_submission_errors(
673        &self,
674        tx: Option<TxSenderDbTx<'_>>,
675        tx_id: u32,
676    ) -> Result<Vec<(String, String)>, BridgeError> {
677        let query = sqlx::query_as::<_, (String, String)>(
678            r#"
679            SELECT error_message, timestamp::TEXT
680            FROM tx_sender_debug_submission_errors
681            WHERE tx_id = $1
682            ORDER BY timestamp ASC
683            "#,
684        )
685        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
686
687        txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all).map_err(Into::into)
688    }
689
690    /// Returns all fee payer UTXOs for a transaction with their confirmation status.
691    pub async fn get_tx_debug_fee_payer_utxos(
692        &self,
693        tx: Option<TxSenderDbTx<'_>>,
694        tx_id: u32,
695    ) -> Result<Vec<(Txid, u32, Amount, bool)>, BridgeError> {
696        let query = sqlx::query_as::<_, (TxidDB, i32, i64, bool)>(
697            r#"
698            SELECT fee_payer_txid, vout, amount, seen_at_height IS NOT NULL as confirmed
699            FROM tx_sender_fee_payer_utxos
700            WHERE bumped_id = $1
701            "#,
702        )
703        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
704
705        let results: Vec<(TxidDB, i32, i64, bool)> =
706            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
707
708        results
709            .iter()
710            .map(|(fee_payer_txid, vout, amount, confirmed)| {
711                Ok((
712                    fee_payer_txid.0,
713                    u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
714                    Amount::from_sat(
715                        u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
716                    ),
717                    *confirmed,
718                ))
719            })
720            .collect::<Result<Vec<_>, BridgeError>>()
721    }
722
723    /// Debug-only helper: log why some txs are inactive (not sendable).
724    pub async fn debug_inactive_txs(&self, fee_rate: FeeRateKvb, current_tip_height: u32) {
725        tracing::info!("TXSENDER_DBG_INACTIVE_TXS: Checking inactive transactions");
726
727        let unconfirmed_txs = match sqlx::query_as::<_, (i32, TxidDB, Option<String>)>(
728            "SELECT id, txid, tx_metadata FROM tx_sender_try_to_send_txs WHERE seen_at_height IS NULL",
729        )
730        .fetch_all(&self.pool)
731        .await
732        {
733            Ok(txs) => txs,
734            Err(e) => {
735                tracing::error!(
736                    "TXSENDER_DBG_INACTIVE_TXS: Failed to query unconfirmed txs: {}",
737                    e
738                );
739                return;
740            }
741        };
742
743        let sendable_txs = match self
744            .get_sendable_txs(None, fee_rate, current_tip_height)
745            .await
746        {
747            Ok(txs) => txs,
748            Err(e) => {
749                tracing::error!(
750                    "TXSENDER_DBG_INACTIVE_TXS: Failed to get sendable txs: {}",
751                    e
752                );
753                return;
754            }
755        };
756
757        for (tx_id, txid, tx_metadata) in unconfirmed_txs {
758            let tx_metadata: Option<TxMetadata> =
759                serde_json::from_str(tx_metadata.as_deref().unwrap_or("null")).ok();
760
761            let id = match u32::try_from(tx_id) {
762                Ok(id) => id,
763                Err(e) => {
764                    tracing::error!("TXSENDER_DBG_INACTIVE_TXS: Failed to convert id: {}", e);
765                    continue;
766                }
767            };
768
769            if sendable_txs.contains(&id) {
770                tracing::info!(
771                    "TXSENDER_DBG_INACTIVE_TXS: TX {} (txid: {}) is ACTIVE",
772                    id,
773                    txid.0
774                );
775                continue;
776            }
777
778            tracing::info!(
779                "TXSENDER_DBG_INACTIVE_TXS: TX {} (txid: {}, type: {:?}) is inactive, reasons:",
780                id,
781                txid.0,
782                tx_metadata.as_ref().map(|metadata| metadata.tx_type)
783            );
784
785            // txid activations
786            let txid_activations = match sqlx::query_as::<_, (Option<i32>, i64, TxidDB)>(
787                "SELECT seen_at_height, timelock, txid
788                FROM tx_sender_activate_try_to_send_txids
789                WHERE activated_id = $1",
790            )
791            .bind(tx_id)
792            .fetch_all(&self.pool)
793            .await
794            {
795                Ok(activations) => activations,
796                Err(e) => {
797                    tracing::error!(
798                        "TXSENDER_DBG_INACTIVE_TXS: Failed to query txid activations: {}",
799                        e
800                    );
801                    continue;
802                }
803            };
804
805            for (seen_at_height, timelock, txid) in txid_activations {
806                if seen_at_height.is_none() {
807                    tracing::info!("TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because its txid activation {} has not been seen", id, txid.0);
808                    continue;
809                }
810
811                let seen_at_height = seen_at_height.expect("checked above");
812                if (seen_at_height as i64) + timelock > current_tip_height as i64 {
813                    tracing::info!(
814                        "TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because its txid activation timelock hasn't expired (seen_at_height: {}, timelock: {}, current_tip_height: {})",
815                        id, seen_at_height, timelock, current_tip_height
816                    );
817                }
818            }
819
820            // outpoint activations
821            let outpoint_activations = match sqlx::query_as::<_, (Option<i32>, i64, TxidDB, i32)>(
822                "SELECT seen_at_height, timelock, txid, vout
823                    FROM tx_sender_activate_try_to_send_outpoints
824                    WHERE activated_id = $1",
825            )
826            .bind(tx_id)
827            .fetch_all(&self.pool)
828            .await
829            {
830                Ok(activations) => activations,
831                Err(e) => {
832                    tracing::error!(
833                        "TXSENDER_DBG_INACTIVE_TXS: Failed to query outpoint activations: {}",
834                        e
835                    );
836                    continue;
837                }
838            };
839
840            for (seen_at_height, timelock, txid, vout) in outpoint_activations {
841                if seen_at_height.is_none() {
842                    tracing::info!("TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because its outpoint activation {}:{} has not been seen spent", id, txid.0, vout);
843                    continue;
844                }
845
846                let seen_at_height = seen_at_height.expect("checked above");
847                if (seen_at_height as i64) + timelock > current_tip_height as i64 {
848                    tracing::info!(
849                        "TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because its outpoint activation timelock hasn't expired (seen_at_height: {}, timelock: {}, current_tip_height: {})",
850                        id, seen_at_height, timelock, current_tip_height
851                    );
852                }
853            }
854
855            // cancellations
856            let cancelled_txids = match sqlx::query_as::<_, (TxidDB, Option<i32>)>(
857                "SELECT txid, seen_at_height
858                FROM tx_sender_cancel_try_to_send_txids
859                WHERE cancelled_id = $1",
860            )
861            .bind(tx_id)
862            .fetch_all(&self.pool)
863            .await
864            {
865                Ok(x) => x,
866                Err(e) => {
867                    tracing::error!(
868                        "TXSENDER_DBG_INACTIVE_TXS: Failed to query cancelled txids: {}",
869                        e
870                    );
871                    continue;
872                }
873            };
874
875            for (txid, seen_at_height) in cancelled_txids {
876                if seen_at_height.is_some() {
877                    tracing::info!(
878                        "TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because it was cancelled by txid {}",
879                        id,
880                        txid.0
881                    );
882                }
883            }
884
885            let cancelled_outpoints = match sqlx::query_as::<_, (TxidDB, i32, Option<i32>)>(
886                "SELECT txid, vout, seen_at_height
887                FROM tx_sender_cancel_try_to_send_outpoints
888                WHERE cancelled_id = $1",
889            )
890            .bind(tx_id)
891            .fetch_all(&self.pool)
892            .await
893            {
894                Ok(x) => x,
895                Err(e) => {
896                    tracing::error!(
897                        "TXSENDER_DBG_INACTIVE_TXS: Failed to query cancelled outpoints: {}",
898                        e
899                    );
900                    continue;
901                }
902            };
903
904            for (txid, vout, seen_at_height) in cancelled_outpoints {
905                if seen_at_height.is_some() {
906                    tracing::info!(
907                        "TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because it was cancelled by outpoint {}:{}",
908                        id,
909                        txid.0,
910                        vout
911                    );
912                }
913            }
914
915            let effective_fee_rate = match sqlx::query_scalar::<_, Option<i64>>(
916                "SELECT effective_fee_rate FROM tx_sender_try_to_send_txs WHERE id = $1",
917            )
918            .bind(tx_id)
919            .fetch_one(&self.pool)
920            .await
921            {
922                Ok(rate) => rate,
923                Err(e) => {
924                    tracing::error!(
925                        "TXSENDER_DBG_INACTIVE_TXS: Failed to query effective fee rate: {}",
926                        e
927                    );
928                    continue;
929                }
930            };
931
932            if let Some(rate) = effective_fee_rate {
933                if rate >= fee_rate.to_sat_per_kvb() as i64 {
934                    tracing::info!(
935                        "TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because its effective fee rate ({} sat/kvB) is >= the current fee rate ({} sat/kvB)",
936                        id,
937                        rate,
938                        fee_rate.to_sat_per_kvb()
939                    );
940                }
941            }
942        }
943    }
944
945    /// Lists all unfinalized try-to-send transactions that should be checked for confirmation.
946    ///
947    /// This function excludes transactions that have been permanently cancelled (i.e., any cancellation
948    /// from `tx_sender_cancel_try_to_send_txids` or `tx_sender_cancel_try_to_send_outpoints` is finalized).
949    /// Once a cancellation is finalized, the transaction can never be sent, so there's no point in
950    /// repeatedly checking it via RPC. This prevents unnecessary RPC calls for transactions that are
951    /// permanently cancelled.
952    pub async fn list_unfinalized_try_to_send_txs(
953        &self,
954        tx: Option<TxSenderDbTx<'_>>,
955    ) -> Result<Vec<(u32, FeePayingType, Txid, Option<u32>)>, BridgeError> {
956        let query = sqlx::query_as::<_, (i32, FeePayingType, TxidDB, Option<i32>)>(
957            r#"
958            SELECT id, fee_paying_type, txid, seen_at_height
959            FROM tx_sender_try_to_send_txs
960            WHERE is_finalized = FALSE
961              AND NOT EXISTS (
962                  SELECT 1
963                  FROM tx_sender_cancel_try_to_send_txids
964                  WHERE cancelled_id = tx_sender_try_to_send_txs.id
965                    AND is_finalized = TRUE
966              )
967              AND NOT EXISTS (
968                  SELECT 1
969                  FROM tx_sender_cancel_try_to_send_outpoints
970                  WHERE cancelled_id = tx_sender_try_to_send_txs.id
971                    AND is_finalized = TRUE
972              )
973            "#,
974        );
975
976        let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
977        results
978            .into_iter()
979            .map(|(id, fee_paying_type, txid, seen_at_height)| {
980                Ok((
981                    u32::try_from(id).wrap_err("Failed to convert id to u32")?,
982                    fee_paying_type,
983                    txid.0,
984                    seen_at_height
985                        .map(u32::try_from)
986                        .transpose()
987                        .wrap_err("Failed to convert seen_at_height to u32")?,
988                ))
989            })
990            .collect::<Result<Vec<_>, BridgeError>>()
991    }
992
993    pub async fn list_rbf_txids_for_ids(
994        &self,
995        tx: Option<TxSenderDbTx<'_>>,
996        ids: &[u32],
997    ) -> Result<Vec<(u32, Txid)>, BridgeError> {
998        if ids.is_empty() {
999            return Ok(vec![]);
1000        }
1001
1002        let ids_i32: Vec<i32> = ids
1003            .iter()
1004            .copied()
1005            .map(i32::try_from)
1006            .collect::<Result<Vec<_>, _>>()
1007            .wrap_err("Failed to convert ids to i32")?;
1008
1009        let query = sqlx::query_as::<_, (i32, TxidDB)>(
1010            "SELECT id, txid FROM tx_sender_rbf_txids WHERE id = ANY($1) ORDER BY insertion_order DESC",
1011        )
1012        .bind(ids_i32);
1013
1014        let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1015        results
1016            .into_iter()
1017            .map(|(id, txid)| {
1018                Ok((
1019                    u32::try_from(id).wrap_err("Failed to convert id to u32")?,
1020                    txid.0,
1021                ))
1022            })
1023            .collect::<Result<Vec<_>, BridgeError>>()
1024    }
1025
1026    pub async fn set_try_to_send_seen_at_height(
1027        &self,
1028        tx: Option<TxSenderDbTx<'_>>,
1029        id: u32,
1030        seen_at_height: Option<u32>,
1031    ) -> Result<(), BridgeError> {
1032        let query =
1033            sqlx::query("UPDATE tx_sender_try_to_send_txs SET seen_at_height = $2 WHERE id = $1")
1034                .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?)
1035                .bind(
1036                    seen_at_height
1037                        .map(i32::try_from)
1038                        .transpose()
1039                        .wrap_err("Failed to convert seen_at_height to i32")?,
1040                );
1041
1042        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1043        Ok(())
1044    }
1045
1046    /// Returns seen_at_height and is_finalized for a set of try_to_send ids.
1047    pub async fn list_try_to_send_statuses_by_ids(
1048        &self,
1049        tx: Option<TxSenderDbTx<'_>>,
1050        ids: &[u32],
1051    ) -> Result<HashMap<u32, (Option<u32>, bool)>, BridgeError> {
1052        if ids.is_empty() {
1053            return Ok(HashMap::new());
1054        }
1055
1056        let ids_i32: Vec<i32> = ids
1057            .iter()
1058            .copied()
1059            .map(i32::try_from)
1060            .collect::<Result<Vec<_>, _>>()
1061            .wrap_err("Failed to convert ids to i32")?;
1062
1063        let query = sqlx::query_as::<_, (i32, Option<i32>, bool)>(
1064            "SELECT id, seen_at_height, is_finalized
1065             FROM tx_sender_try_to_send_txs
1066             WHERE id = ANY($1)",
1067        )
1068        .bind(ids_i32);
1069
1070        let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1071        let mut map = HashMap::with_capacity(results.len());
1072        for (id, seen_at_height, is_finalized) in results {
1073            let id = u32::try_from(id).wrap_err("Failed to convert id to u32")?;
1074            let seen_at_height = seen_at_height
1075                .map(u32::try_from)
1076                .transpose()
1077                .wrap_err("Failed to convert seen_at_height to u32")?;
1078            map.insert(id, (seen_at_height, is_finalized));
1079        }
1080
1081        Ok(map)
1082    }
1083
1084    pub async fn set_try_to_send_finalized(
1085        &self,
1086        tx: Option<TxSenderDbTx<'_>>,
1087        id: u32,
1088        is_finalized: bool,
1089    ) -> Result<(), BridgeError> {
1090        let query =
1091            sqlx::query("UPDATE tx_sender_try_to_send_txs SET is_finalized = $2 WHERE id = $1")
1092                .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?)
1093                .bind(is_finalized);
1094
1095        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1096        Ok(())
1097    }
1098
1099    /// Lists all unfinalized fee payer UTXOs that should be checked for confirmation.
1100    ///
1101    /// Fee payer UTXOs form replacement chains via `replacement_of_id`:
1102    /// - The first created UTXO in a chain has `replacement_of_id IS NULL` and its `id` is
1103    ///   the canonical parent id for the chain.
1104    /// - All replacements in that chain have `replacement_of_id = <parent id>`.
1105    ///
1106    /// This function excludes fee payer UTXOs where **any** UTXO in the same replacement chain
1107    /// (canonical parent or any of its replacements) is already finalized. Once a chain has a
1108    /// finalized fee payer UTXO, there's no need to check the others, preventing unnecessary
1109    /// RPC calls.
1110    pub async fn list_unfinalized_fee_payer_utxos(
1111        &self,
1112        tx: Option<TxSenderDbTx<'_>>,
1113    ) -> Result<Vec<(u32, Txid, Option<u32>)>, BridgeError> {
1114        let query = sqlx::query_as::<_, (i32, TxidDB, Option<i32>)>(
1115            r#"
1116            SELECT id, fee_payer_txid, seen_at_height
1117            FROM tx_sender_fee_payer_utxos
1118            WHERE is_finalized = FALSE
1119              AND NOT EXISTS (
1120                  SELECT 1
1121                  FROM tx_sender_fee_payer_utxos other
1122                  WHERE COALESCE(other.replacement_of_id, other.id)
1123                        = COALESCE(tx_sender_fee_payer_utxos.replacement_of_id, tx_sender_fee_payer_utxos.id)
1124                    AND other.is_finalized = TRUE
1125              )
1126            "#,
1127        );
1128
1129        let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1130        results
1131            .into_iter()
1132            .map(|(id, txid, seen_at_height)| {
1133                Ok((
1134                    u32::try_from(id).wrap_err("Failed to convert id to u32")?,
1135                    txid.0,
1136                    seen_at_height
1137                        .map(u32::try_from)
1138                        .transpose()
1139                        .wrap_err("Failed to convert seen_at_height to u32")?,
1140                ))
1141            })
1142            .collect::<Result<Vec<_>, BridgeError>>()
1143    }
1144
1145    pub async fn set_fee_payer_seen_at_height(
1146        &self,
1147        tx: Option<TxSenderDbTx<'_>>,
1148        fee_payer_utxo_id: u32,
1149        seen_at_height: Option<u32>,
1150    ) -> Result<(), BridgeError> {
1151        let query =
1152            sqlx::query("UPDATE tx_sender_fee_payer_utxos SET seen_at_height = $2 WHERE id = $1")
1153                .bind(i32::try_from(fee_payer_utxo_id).wrap_err("Failed to convert id to i32")?)
1154                .bind(
1155                    seen_at_height
1156                        .map(i32::try_from)
1157                        .transpose()
1158                        .wrap_err("Failed to convert seen_at_height to i32")?,
1159                );
1160
1161        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1162        Ok(())
1163    }
1164
1165    pub async fn set_fee_payer_finalized(
1166        &self,
1167        tx: Option<TxSenderDbTx<'_>>,
1168        fee_payer_utxo_id: u32,
1169        is_finalized: bool,
1170    ) -> Result<(), BridgeError> {
1171        let query =
1172            sqlx::query("UPDATE tx_sender_fee_payer_utxos SET is_finalized = $2 WHERE id = $1")
1173                .bind(i32::try_from(fee_payer_utxo_id).wrap_err("Failed to convert id to i32")?)
1174                .bind(is_finalized);
1175
1176        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1177        Ok(())
1178    }
1179
1180    pub async fn list_unfinalized_cancel_txids(
1181        &self,
1182        tx: Option<TxSenderDbTx<'_>>,
1183    ) -> Result<Vec<(u32, Txid, Option<u32>)>, BridgeError> {
1184        let query = sqlx::query_as::<_, (i32, TxidDB, Option<i32>)>(
1185            r#"
1186            SELECT cancelled_id, txid, seen_at_height
1187            FROM tx_sender_cancel_try_to_send_txids
1188            WHERE is_finalized = FALSE
1189            "#,
1190        );
1191
1192        let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1193        results
1194            .into_iter()
1195            .map(|(cancelled_id, txid, seen_at_height)| {
1196                Ok((
1197                    u32::try_from(cancelled_id)
1198                        .wrap_err("Failed to convert cancelled_id to u32")?,
1199                    txid.0,
1200                    seen_at_height
1201                        .map(u32::try_from)
1202                        .transpose()
1203                        .wrap_err("Failed to convert seen_at_height to u32")?,
1204                ))
1205            })
1206            .collect::<Result<Vec<_>, BridgeError>>()
1207    }
1208
1209    pub async fn set_cancel_txid_seen_at_height(
1210        &self,
1211        tx: Option<TxSenderDbTx<'_>>,
1212        cancelled_id: u32,
1213        txid: Txid,
1214        seen_at_height: Option<u32>,
1215    ) -> Result<(), BridgeError> {
1216        let query = sqlx::query(
1217            "UPDATE tx_sender_cancel_try_to_send_txids SET seen_at_height = $3 WHERE cancelled_id = $1 AND txid = $2",
1218        )
1219        .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled_id to i32")?)
1220        .bind(TxidDB(txid))
1221        .bind(
1222            seen_at_height
1223                .map(i32::try_from)
1224                .transpose()
1225                .wrap_err("Failed to convert seen_at_height to i32")?,
1226        );
1227
1228        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1229        Ok(())
1230    }
1231
1232    pub async fn set_cancel_txid_finalized(
1233        &self,
1234        tx: Option<TxSenderDbTx<'_>>,
1235        cancelled_id: u32,
1236        txid: Txid,
1237        is_finalized: bool,
1238    ) -> Result<(), BridgeError> {
1239        let query = sqlx::query(
1240            "UPDATE tx_sender_cancel_try_to_send_txids SET is_finalized = $3 WHERE cancelled_id = $1 AND txid = $2",
1241        )
1242        .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled_id to i32")?)
1243        .bind(TxidDB(txid))
1244        .bind(is_finalized);
1245
1246        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1247        Ok(())
1248    }
1249
1250    pub async fn list_unfinalized_activate_txids(
1251        &self,
1252        tx: Option<TxSenderDbTx<'_>>,
1253    ) -> Result<Vec<(u32, Txid, Option<u32>, bool)>, BridgeError> {
1254        let query = sqlx::query_as::<_, (i32, TxidDB, Option<i32>, bool)>(
1255            r#"
1256            SELECT activated_id, txid, seen_at_height, in_mempool
1257            FROM tx_sender_activate_try_to_send_txids
1258            WHERE is_finalized = FALSE
1259            "#,
1260        );
1261
1262        let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1263        results
1264            .into_iter()
1265            .map(|(activated_id, txid, seen_at_height, in_mempool)| {
1266                Ok((
1267                    u32::try_from(activated_id)
1268                        .wrap_err("Failed to convert activated_id to u32")?,
1269                    txid.0,
1270                    seen_at_height
1271                        .map(u32::try_from)
1272                        .transpose()
1273                        .wrap_err("Failed to convert seen_at_height to u32")?,
1274                    in_mempool,
1275                ))
1276            })
1277            .collect::<Result<Vec<_>, BridgeError>>()
1278    }
1279
1280    pub async fn set_activate_txid_seen_at_height(
1281        &self,
1282        tx: Option<TxSenderDbTx<'_>>,
1283        activated_id: u32,
1284        txid: Txid,
1285        seen_at_height: Option<u32>,
1286    ) -> Result<(), BridgeError> {
1287        let query = sqlx::query(
1288            "UPDATE tx_sender_activate_try_to_send_txids SET seen_at_height = $3 WHERE activated_id = $1 AND txid = $2",
1289        )
1290        .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated_id to i32")?)
1291        .bind(TxidDB(txid))
1292        .bind(
1293            seen_at_height
1294                .map(i32::try_from)
1295                .transpose()
1296                .wrap_err("Failed to convert seen_at_height to i32")?,
1297        );
1298
1299        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1300        Ok(())
1301    }
1302
1303    pub async fn set_activate_txid_finalized(
1304        &self,
1305        tx: Option<TxSenderDbTx<'_>>,
1306        activated_id: u32,
1307        txid: Txid,
1308        is_finalized: bool,
1309    ) -> Result<(), BridgeError> {
1310        let query = sqlx::query(
1311            "UPDATE tx_sender_activate_try_to_send_txids SET is_finalized = $3 WHERE activated_id = $1 AND txid = $2",
1312        )
1313        .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated_id to i32")?)
1314        .bind(TxidDB(txid))
1315        .bind(is_finalized);
1316
1317        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1318        Ok(())
1319    }
1320
1321    pub async fn set_activate_txid_mempool_status(
1322        &self,
1323        tx: Option<TxSenderDbTx<'_>>,
1324        activated_id: u32,
1325        txid: Txid,
1326        in_mempool: bool,
1327    ) -> Result<(), BridgeError> {
1328        let query = sqlx::query(
1329            "UPDATE tx_sender_activate_try_to_send_txids SET in_mempool = $3 WHERE activated_id = $1 AND txid = $2",
1330        )
1331        .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated_id to i32")?)
1332        .bind(TxidDB(txid))
1333        .bind(in_mempool);
1334
1335        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1336        Ok(())
1337    }
1338
1339    pub async fn get_activate_txid_status(
1340        &self,
1341        tx: Option<TxSenderDbTx<'_>>,
1342        txid: Txid,
1343    ) -> Result<Option<(bool, Option<u32>)>, BridgeError> {
1344        let query = sqlx::query_as::<_, (Option<bool>, Option<i32>)>(
1345            "SELECT bool_or(in_mempool), max(seen_at_height)
1346             FROM tx_sender_activate_try_to_send_txids
1347             WHERE txid = $1",
1348        )
1349        .bind(TxidDB(txid));
1350
1351        let (any_in_mempool, seen_at_height): (Option<bool>, Option<i32>) =
1352            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_one)?;
1353
1354        if any_in_mempool.is_none() && seen_at_height.is_none() {
1355            return Ok(None);
1356        }
1357
1358        let any_in_mempool = any_in_mempool.unwrap_or(false);
1359        let seen_at_height = seen_at_height
1360            .map(u32::try_from)
1361            .transpose()
1362            .wrap_err("Failed to convert seen_at_height to u32")?;
1363
1364        Ok(Some((any_in_mempool, seen_at_height)))
1365    }
1366
1367    pub async fn delete_try_to_send_tx(
1368        &self,
1369        mut tx: Option<TxSenderDbTx<'_>>,
1370        id: u32,
1371    ) -> Result<(), BridgeError> {
1372        let id_i32 = i32::try_from(id).wrap_err("Failed to convert id to i32")?;
1373
1374        let queries = [
1375            "DELETE FROM tx_sender_debug_sending_state WHERE tx_id = $1",
1376            "DELETE FROM tx_sender_debug_submission_errors WHERE tx_id = $1",
1377            "DELETE FROM tx_sender_rbf_txids WHERE id = $1",
1378            "DELETE FROM tx_sender_fee_payer_utxos WHERE bumped_id = $1",
1379            "DELETE FROM tx_sender_cancel_try_to_send_outpoints WHERE cancelled_id = $1",
1380            "DELETE FROM tx_sender_cancel_try_to_send_txids WHERE cancelled_id = $1",
1381            "DELETE FROM tx_sender_activate_try_to_send_outpoints WHERE activated_id = $1",
1382            "DELETE FROM tx_sender_activate_try_to_send_txids WHERE activated_id = $1",
1383            "DELETE FROM tx_sender_try_to_send_txs WHERE id = $1",
1384        ];
1385
1386        for sql in queries {
1387            let query = sqlx::query(sql).bind(id_i32);
1388            txsender_execute_query_with_tx!(&self.pool, tx.as_deref_mut(), query, execute)?;
1389        }
1390
1391        Ok(())
1392    }
1393
1394    pub async fn list_unfinalized_cancel_outpoints(
1395        &self,
1396        tx: Option<TxSenderDbTx<'_>>,
1397    ) -> Result<Vec<(u32, OutPoint, Option<u32>)>, BridgeError> {
1398        let query = sqlx::query_as::<_, (i32, TxidDB, i32, Option<i32>)>(
1399            r#"
1400            SELECT cancelled_id, txid, vout, seen_at_height
1401            FROM tx_sender_cancel_try_to_send_outpoints
1402            WHERE is_finalized = FALSE
1403            "#,
1404        );
1405
1406        let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1407        results
1408            .into_iter()
1409            .map(|(cancelled_id, txid, vout, seen_at_height)| {
1410                Ok((
1411                    u32::try_from(cancelled_id)
1412                        .wrap_err("Failed to convert cancelled_id to u32")?,
1413                    OutPoint {
1414                        txid: txid.0,
1415                        vout: u32::try_from(vout).wrap_err("Failed to convert vout to u32")?,
1416                    },
1417                    seen_at_height
1418                        .map(u32::try_from)
1419                        .transpose()
1420                        .wrap_err("Failed to convert seen_at_height to u32")?,
1421                ))
1422            })
1423            .collect::<Result<Vec<_>, BridgeError>>()
1424    }
1425
1426    pub async fn set_cancel_outpoint_seen_at_height(
1427        &self,
1428        tx: Option<TxSenderDbTx<'_>>,
1429        cancelled_id: u32,
1430        outpoint: OutPoint,
1431        seen_at_height: Option<u32>,
1432    ) -> Result<(), BridgeError> {
1433        let query = sqlx::query(
1434            "UPDATE tx_sender_cancel_try_to_send_outpoints SET seen_at_height = $4 WHERE cancelled_id = $1 AND txid = $2 AND vout = $3",
1435        )
1436        .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled_id to i32")?)
1437        .bind(TxidDB(outpoint.txid))
1438        .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?)
1439        .bind(
1440            seen_at_height
1441                .map(i32::try_from)
1442                .transpose()
1443                .wrap_err("Failed to convert seen_at_height to i32")?,
1444        );
1445
1446        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1447        Ok(())
1448    }
1449
1450    pub async fn set_cancel_outpoint_finalized(
1451        &self,
1452        tx: Option<TxSenderDbTx<'_>>,
1453        cancelled_id: u32,
1454        outpoint: OutPoint,
1455        is_finalized: bool,
1456    ) -> Result<(), BridgeError> {
1457        let query = sqlx::query(
1458            "UPDATE tx_sender_cancel_try_to_send_outpoints SET is_finalized = $4 WHERE cancelled_id = $1 AND txid = $2 AND vout = $3",
1459        )
1460        .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled_id to i32")?)
1461        .bind(TxidDB(outpoint.txid))
1462        .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?)
1463        .bind(is_finalized);
1464
1465        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1466        Ok(())
1467    }
1468
1469    pub async fn list_unfinalized_activate_outpoints(
1470        &self,
1471        tx: Option<TxSenderDbTx<'_>>,
1472    ) -> Result<Vec<(u32, OutPoint, Option<u32>)>, BridgeError> {
1473        let query = sqlx::query_as::<_, (i32, TxidDB, i32, Option<i32>)>(
1474            r#"
1475            SELECT activated_id, txid, vout, seen_at_height
1476            FROM tx_sender_activate_try_to_send_outpoints
1477            WHERE is_finalized = FALSE
1478            "#,
1479        );
1480
1481        let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1482        results
1483            .into_iter()
1484            .map(|(activated_id, txid, vout, seen_at_height)| {
1485                Ok((
1486                    u32::try_from(activated_id)
1487                        .wrap_err("Failed to convert activated_id to u32")?,
1488                    OutPoint {
1489                        txid: txid.0,
1490                        vout: u32::try_from(vout).wrap_err("Failed to convert vout to u32")?,
1491                    },
1492                    seen_at_height
1493                        .map(u32::try_from)
1494                        .transpose()
1495                        .wrap_err("Failed to convert seen_at_height to u32")?,
1496                ))
1497            })
1498            .collect::<Result<Vec<_>, BridgeError>>()
1499    }
1500
1501    pub async fn set_activate_outpoint_seen_at_height(
1502        &self,
1503        tx: Option<TxSenderDbTx<'_>>,
1504        activated_id: u32,
1505        outpoint: OutPoint,
1506        seen_at_height: Option<u32>,
1507    ) -> Result<(), BridgeError> {
1508        let query = sqlx::query(
1509            "UPDATE tx_sender_activate_try_to_send_outpoints SET seen_at_height = $4 WHERE activated_id = $1 AND txid = $2 AND vout = $3",
1510        )
1511        .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated_id to i32")?)
1512        .bind(TxidDB(outpoint.txid))
1513        .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?)
1514        .bind(
1515            seen_at_height
1516                .map(i32::try_from)
1517                .transpose()
1518                .wrap_err("Failed to convert seen_at_height to i32")?,
1519        );
1520
1521        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1522        Ok(())
1523    }
1524
1525    pub async fn set_activate_outpoint_finalized(
1526        &self,
1527        tx: Option<TxSenderDbTx<'_>>,
1528        activated_id: u32,
1529        outpoint: OutPoint,
1530        is_finalized: bool,
1531    ) -> Result<(), BridgeError> {
1532        let query = sqlx::query(
1533            "UPDATE tx_sender_activate_try_to_send_outpoints SET is_finalized = $4 WHERE activated_id = $1 AND txid = $2 AND vout = $3",
1534        )
1535        .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated_id to i32")?)
1536        .bind(TxidDB(outpoint.txid))
1537        .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?)
1538        .bind(is_finalized);
1539
1540        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1541        Ok(())
1542    }
1543
1544    pub async fn update_synced_height(&self, height: u32) -> Result<(), BridgeError> {
1545        sqlx::query(
1546            "INSERT INTO tx_sender_sync_state (id, synced_height, updated_at)
1547             VALUES (1, $1, NOW())
1548             ON CONFLICT (id) DO UPDATE SET synced_height = EXCLUDED.synced_height, updated_at = NOW()",
1549        )
1550        .bind(i32::try_from(height).wrap_err("Failed to convert height to i32")?)
1551        .execute(&self.pool)
1552        .await
1553        .map_err(BridgeError::DatabaseError)?;
1554        Ok(())
1555    }
1556
1557    pub async fn get_synced_height(&self) -> Result<Option<u32>, BridgeError> {
1558        let result: Option<i32> =
1559            sqlx::query_scalar("SELECT synced_height FROM tx_sender_sync_state WHERE id = 1")
1560                .fetch_optional(&self.pool)
1561                .await
1562                .map_err(BridgeError::DatabaseError)?;
1563
1564        Ok(result
1565            .map(|h| u32::try_from(h).wrap_err("Failed to convert height from DB"))
1566            .transpose()?)
1567    }
1568}
1569
1570#[cfg(all(test, feature = "testing"))]
1571mod tests {
1572    use super::*;
1573    use crate::test_utils::create_test_environment;
1574    use bitcoin::hashes::Hash as _;
1575    use bitcoin::transaction::Version;
1576    use bitcoin::{absolute, Transaction, Txid};
1577
1578    fn txid(byte: u8) -> Txid {
1579        Txid::from_byte_array([byte; 32])
1580    }
1581
1582    fn empty_tx() -> Transaction {
1583        Transaction {
1584            version: Version::TWO,
1585            lock_time: absolute::LockTime::ZERO,
1586            input: vec![],
1587            output: vec![],
1588        }
1589    }
1590
1591    async fn save_fee_payer_chain(db: &TxSenderDb, txid_prefix: u8) -> (u32, u32, u32) {
1592        let mut dbtx = db.begin_transaction().await.unwrap();
1593        let bumped_id = db
1594            .save_tx(
1595                &mut dbtx,
1596                None,
1597                &empty_tx(),
1598                FeePayingType::CPFP,
1599                txid(txid_prefix),
1600                None,
1601            )
1602            .await
1603            .unwrap();
1604        db.commit_transaction(dbtx).await.unwrap();
1605
1606        let root_txid = txid(txid_prefix + 1);
1607        db.save_fee_payer_tx(
1608            None,
1609            bumped_id,
1610            root_txid,
1611            0,
1612            Amount::from_sat(10_000),
1613            None,
1614        )
1615        .await
1616        .unwrap();
1617
1618        let initial: Vec<(u32, u32, Txid, u32, Amount, Option<u32>)> =
1619            db.get_all_unconfirmed_fee_payer_txs(None).await.unwrap();
1620        let root_id = initial
1621            .iter()
1622            .find_map(|(id, chain_bumped_id, txid, _, _, _)| {
1623                (*chain_bumped_id == bumped_id && *txid == root_txid).then_some(*id)
1624            })
1625            .unwrap();
1626
1627        let replacement_txid = txid(txid_prefix + 2);
1628        db.save_fee_payer_tx(
1629            None,
1630            bumped_id,
1631            replacement_txid,
1632            0,
1633            Amount::from_sat(10_000),
1634            Some(root_id),
1635        )
1636        .await
1637        .unwrap();
1638
1639        let replacement_id: i32 = sqlx::query_scalar(
1640            "SELECT id FROM tx_sender_fee_payer_utxos WHERE fee_payer_txid = $1",
1641        )
1642        .bind(TxidDB(replacement_txid))
1643        .fetch_one(db.pool())
1644        .await
1645        .unwrap();
1646
1647        let unconfirmed = db
1648            .get_unconfirmed_fee_payer_txs(None, bumped_id)
1649            .await
1650            .unwrap();
1651        assert_eq!(unconfirmed.len(), 2);
1652
1653        (bumped_id, root_id, replacement_id as u32)
1654    }
1655
1656    async fn assert_no_unconfirmed_fee_payers(db: &TxSenderDb, bumped_id: u32) {
1657        assert!(db
1658            .get_unconfirmed_fee_payer_txs(None, bumped_id)
1659            .await
1660            .unwrap()
1661            .is_empty());
1662    }
1663
1664    #[tokio::test]
1665    async fn confirmed_fee_payer_chain_has_no_unconfirmed_txs() {
1666        let db = create_test_environment(true, false).await.1.unwrap();
1667
1668        let (root_confirmed_bumped_id, root_id, _) = save_fee_payer_chain(&db, 10).await;
1669        db.set_fee_payer_seen_at_height(None, root_id, Some(100))
1670            .await
1671            .unwrap();
1672        assert_no_unconfirmed_fee_payers(&db, root_confirmed_bumped_id).await;
1673
1674        let (replacement_confirmed_bumped_id, _, replacement_id) =
1675            save_fee_payer_chain(&db, 20).await;
1676        db.set_fee_payer_seen_at_height(None, replacement_id, Some(101))
1677            .await
1678            .unwrap();
1679        assert_no_unconfirmed_fee_payers(&db, replacement_confirmed_bumped_id).await;
1680
1681        assert!(db
1682            .get_all_unconfirmed_fee_payer_txs(None)
1683            .await
1684            .unwrap()
1685            .is_empty());
1686    }
1687}