clementine_core/database/
tx_sender.rs

1//! This module includes database functions which are mainly used by the transaction sender.
2
3use async_trait::async_trait;
4
5use super::{wrapper::TxidDB, Database, DatabaseTransaction};
6use crate::execute_query_with_tx;
7use bitcoin::{
8    consensus::{deserialize, serialize},
9    Amount, FeeRate, Transaction, Txid,
10};
11use clementine_errors::BridgeError;
12use clementine_tx_sender::{ActivatedWithOutpoint, ActivatedWithTxid};
13use clementine_utils::{FeePayingType, RbfSigningInfo, TxMetadata};
14use eyre::{Context, OptionExt};
15use sqlx::Executor;
16
17impl Database {
18    /// Synchronizes transaction confirmations based on canonical block status.
19    /// Confirms transactions in canonical blocks and unconfirms transactions in
20    /// non-canonical blocks. This handles both new confirmations/unconfirmations
21    /// and any previously missed updates due to race conditions.
22    pub async fn sync_transaction_confirmations(
23        &self,
24        tx: Option<DatabaseTransaction<'_>>,
25    ) -> Result<(), BridgeError> {
26        // Do all confirmation/unconfirmation updates in one round-trip.
27        // Postgres writable CTEs are executed in-order, so this preserves the
28        // original semantics while avoiding 14 separate UPDATEs.
29        let query = sqlx::query(
30            r#"
31            WITH
32            u1 AS (
33                UPDATE tx_sender_activate_try_to_send_txids AS tap
34                SET seen_block_id = bs.id
35                FROM bitcoin_syncer_txs bst
36                JOIN bitcoin_syncer bs ON bst.block_id = bs.id
37                WHERE tap.txid = bst.txid
38                  AND tap.seen_block_id IS NULL
39                  AND bs.is_canonical = TRUE
40            ),
41            u2 AS (
42                UPDATE tx_sender_activate_try_to_send_outpoints AS tap
43                SET seen_block_id = bs.id
44                FROM bitcoin_syncer_spent_utxos bsu
45                JOIN bitcoin_syncer bs ON bsu.block_id = bs.id
46                WHERE tap.txid = bsu.txid
47                  AND tap.vout = bsu.vout
48                  AND tap.seen_block_id IS NULL
49                  AND bs.is_canonical = TRUE
50            ),
51            u3 AS (
52                UPDATE tx_sender_cancel_try_to_send_txids AS ctt
53                SET seen_block_id = bs.id
54                FROM bitcoin_syncer_txs bst
55                JOIN bitcoin_syncer bs ON bst.block_id = bs.id
56                WHERE ctt.txid = bst.txid
57                  AND ctt.seen_block_id IS NULL
58                  AND bs.is_canonical = TRUE
59            ),
60            u4 AS (
61                UPDATE tx_sender_cancel_try_to_send_outpoints AS cto
62                SET seen_block_id = bs.id
63                FROM bitcoin_syncer_spent_utxos bsu
64                JOIN bitcoin_syncer bs ON bsu.block_id = bs.id
65                WHERE cto.txid = bsu.txid
66                  AND cto.vout = bsu.vout
67                  AND cto.seen_block_id IS NULL
68                  AND bs.is_canonical = TRUE
69            ),
70            u5 AS (
71                UPDATE tx_sender_fee_payer_utxos AS fpu
72                SET seen_block_id = bs.id
73                FROM bitcoin_syncer_txs bst
74                JOIN bitcoin_syncer bs ON bst.block_id = bs.id
75                WHERE fpu.fee_payer_txid = bst.txid
76                  AND fpu.seen_block_id IS NULL
77                  AND bs.is_canonical = TRUE
78            ),
79            u6 AS (
80                UPDATE tx_sender_try_to_send_txs AS txs
81                SET seen_block_id = bs.id
82                FROM bitcoin_syncer_txs bst
83                JOIN bitcoin_syncer bs ON bst.block_id = bs.id
84                WHERE txs.txid = bst.txid
85                  AND txs.seen_block_id IS NULL
86                  AND bs.is_canonical = TRUE
87            ),
88            u7 AS (
89                -- Handle RBF confirmations: if any RBF txid is confirmed, mark the parent transaction
90                UPDATE tx_sender_try_to_send_txs AS txs
91                SET seen_block_id = bs.id
92                FROM tx_sender_rbf_txids AS rbf
93                JOIN bitcoin_syncer_txs AS bst ON rbf.txid = bst.txid
94                JOIN bitcoin_syncer bs ON bst.block_id = bs.id
95                WHERE txs.id = rbf.id
96                  AND txs.seen_block_id IS NULL
97                  AND bs.is_canonical = TRUE
98            ),
99            u8 AS (
100                -- Unconfirm all transactions that reference non-canonical blocks
101                UPDATE tx_sender_activate_try_to_send_txids AS tap
102                SET seen_block_id = NULL
103                FROM bitcoin_syncer bs
104                WHERE tap.seen_block_id = bs.id
105                  AND bs.is_canonical = FALSE
106            ),
107            u9 AS (
108                UPDATE tx_sender_activate_try_to_send_outpoints AS tap
109                SET seen_block_id = NULL
110                FROM bitcoin_syncer bs
111                WHERE tap.seen_block_id = bs.id
112                  AND bs.is_canonical = FALSE
113            ),
114            u10 AS (
115                UPDATE tx_sender_cancel_try_to_send_txids AS ctt
116                SET seen_block_id = NULL
117                FROM bitcoin_syncer bs
118                WHERE ctt.seen_block_id = bs.id
119                  AND bs.is_canonical = FALSE
120            ),
121            u11 AS (
122                UPDATE tx_sender_cancel_try_to_send_outpoints AS cto
123                SET seen_block_id = NULL
124                FROM bitcoin_syncer bs
125                WHERE cto.seen_block_id = bs.id
126                  AND bs.is_canonical = FALSE
127            ),
128            u12 AS (
129                UPDATE tx_sender_fee_payer_utxos AS fpu
130                SET seen_block_id = NULL
131                FROM bitcoin_syncer bs
132                WHERE fpu.seen_block_id = bs.id
133                  AND bs.is_canonical = FALSE
134            ),
135            u13 AS (
136                UPDATE tx_sender_try_to_send_txs AS txs
137                SET seen_block_id = NULL
138                FROM bitcoin_syncer bs
139                WHERE txs.seen_block_id = bs.id
140                  AND bs.is_canonical = FALSE
141            ),
142            u14 AS (
143                -- Handle RBF unconfirmations: unconfirm the parent transaction if
144                -- it has RBF txids and ALL of them are unconfirmed
145                UPDATE tx_sender_try_to_send_txs AS txs
146                SET seen_block_id = NULL
147                WHERE txs.seen_block_id IS NOT NULL
148                  AND EXISTS (
149                      SELECT 1 FROM tx_sender_rbf_txids AS rbf
150                      WHERE rbf.id = txs.id
151                  )
152                  AND NOT EXISTS (
153                      SELECT 1 FROM tx_sender_rbf_txids AS rbf
154                      JOIN bitcoin_syncer_txs AS bst ON rbf.txid = bst.txid
155                      JOIN bitcoin_syncer bs ON bst.block_id = bs.id
156                      WHERE rbf.id = txs.id
157                        AND bs.is_canonical = TRUE
158                  )
159            )
160            SELECT 1;
161            "#,
162        );
163        execute_query_with_tx!(self.connection, tx, query, execute)?;
164
165        Ok(())
166    }
167
168    /// Saves a fee payer transaction to the database.
169    ///
170    /// # Arguments
171    /// * `bumped_id` - The id of the bumped transaction
172    /// * `fee_payer_txid` - The txid of the fee payer transaction
173    /// * `vout` - The output index of the UTXO
174    /// * `script_pubkey` - The script pubkey of the UTXO
175    /// * `amount` - The amount in satoshis
176    pub async fn save_fee_payer_tx(
177        &self,
178        tx: Option<DatabaseTransaction<'_>>,
179        bumped_id: u32,
180        fee_payer_txid: Txid,
181        vout: u32,
182        amount: Amount,
183        replacement_of_id: Option<u32>,
184    ) -> Result<(), BridgeError> {
185        let query = sqlx::query(
186            "INSERT INTO tx_sender_fee_payer_utxos (bumped_id, fee_payer_txid, vout, amount, replacement_of_id)
187             VALUES ($1, $2, $3, $4, $5)",
188        )
189        .bind(i32::try_from(bumped_id).wrap_err("Failed to convert bumped id to i32")?)
190        .bind(TxidDB(fee_payer_txid))
191        .bind(i32::try_from(vout).wrap_err("Failed to convert vout to i32")?)
192        .bind(i64::try_from(amount.to_sat()).wrap_err("Failed to convert amount to i64")?)
193        .bind(replacement_of_id.map( i32::try_from).transpose().wrap_err("Failed to convert replacement of id to i32")?);
194
195        execute_query_with_tx!(self.connection, tx, query, execute)?;
196
197        Ok(())
198    }
199
200    /// Returns all unconfirmed fee payer transactions for a try-to-send tx.
201    /// Transactions whose replacements are confirmed are not included. But if none of the replacements are confirmed, all replacements are returned.
202    ///
203    /// # Parameters
204    ///
205    /// # Returns
206    ///
207    /// A vector of unconfirmed fee payer transaction details, including:
208    ///
209    /// - [`u32`]: Id of the fee payer transaction.
210    /// - [`u32`]: Id of the bumped transaction.
211    /// - [`Txid`]: Txid of the fee payer transaction.
212    /// - [`u32`]: Output index of the UTXO.
213    /// - [`Amount`]: Amount in satoshis.
214    pub async fn get_all_unconfirmed_fee_payer_txs(
215        &self,
216        tx: Option<DatabaseTransaction<'_>>,
217    ) -> Result<Vec<(u32, u32, Txid, u32, Amount, Option<u32>)>, BridgeError> {
218        let query = sqlx::query_as::<_, (i32, i32, TxidDB, i32, i64, Option<i32>)>(
219            "
220            SELECT fpu.id, fpu.bumped_id, fpu.fee_payer_txid, fpu.vout, fpu.amount, fpu.replacement_of_id
221            FROM tx_sender_fee_payer_utxos fpu
222            WHERE fpu.seen_block_id IS NULL
223              AND fpu.is_evicted = false
224              AND NOT EXISTS (
225                  SELECT 1
226                  FROM tx_sender_fee_payer_utxos x
227                  WHERE (x.replacement_of_id = fpu.replacement_of_id OR x.id = fpu.replacement_of_id)
228                    AND x.seen_block_id IS NOT NULL
229              )
230            ",
231        );
232
233        let results: Vec<(i32, i32, TxidDB, i32, i64, Option<i32>)> =
234            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
235
236        results
237            .iter()
238            .map(
239                |(id, bumped_id, fee_payer_txid, vout, amount, replacement_of_id)| {
240                    Ok((
241                        u32::try_from(*id).wrap_err("Failed to convert id to u32")?,
242                        u32::try_from(*bumped_id).wrap_err("Failed to convert bumped id to u32")?,
243                        fee_payer_txid.0,
244                        u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
245                        Amount::from_sat(
246                            u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
247                        ),
248                        replacement_of_id
249                            .map(u32::try_from)
250                            .transpose()
251                            .wrap_err("Failed to convert replacement of id to u32")?,
252                    ))
253                },
254            )
255            .collect::<Result<Vec<_>, BridgeError>>()
256    }
257
258    /// Returns all unconfirmed fee payer transactions for a try-to-send tx.
259    /// Transactions whose replacements are confirmed are not included. But if none of the replacements are confirmed, all replacements are returned.
260    ///
261    /// # Parameters
262    ///
263    /// - `bumped_id`: The id of the bumped transaction
264    ///
265    /// # Returns
266    ///
267    /// A vector of unconfirmed fee payer transaction details, including:
268    ///
269    /// - [`u32`]: Id of the fee payer transaction.
270    /// - [`Txid`]: Txid of the fee payer transaction.
271    /// - [`u32`]: Output index of the UTXO.
272    /// - [`Amount`]: Amount in satoshis.
273    pub async fn get_unconfirmed_fee_payer_txs(
274        &self,
275        tx: Option<DatabaseTransaction<'_>>,
276        bumped_id: u32,
277    ) -> Result<Vec<(u32, Txid, u32, Amount)>, BridgeError> {
278        let query = sqlx::query_as::<_, (i32, TxidDB, i32, i64)>(
279            "
280            SELECT fpu.id, fpu.fee_payer_txid, fpu.vout, fpu.amount
281            FROM tx_sender_fee_payer_utxos fpu
282            WHERE fpu.bumped_id = $1
283              AND fpu.seen_block_id IS NULL
284              AND fpu.is_evicted = false
285              AND NOT EXISTS (
286                  SELECT 1
287                  FROM tx_sender_fee_payer_utxos x
288                  WHERE (x.replacement_of_id = fpu.replacement_of_id OR x.id = fpu.replacement_of_id)
289                    AND x.seen_block_id IS NOT NULL
290              )
291            ",
292        )
293        .bind(i32::try_from(bumped_id).wrap_err("Failed to convert bumped id to i32")?);
294        let results: Vec<(i32, TxidDB, i32, i64)> =
295            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
296
297        results
298            .iter()
299            .map(|(id, fee_payer_txid, vout, amount)| {
300                Ok((
301                    u32::try_from(*id).wrap_err("Failed to convert id to u32")?,
302                    fee_payer_txid.0,
303                    u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
304                    Amount::from_sat(
305                        u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
306                    ),
307                ))
308            })
309            .collect::<Result<Vec<_>, BridgeError>>()
310    }
311
312    /// Marks a fee payer utxo and all its replacements as evicted.
313    /// If it is marked as evicted, it will not be tried to be bumped again. (Because wallet can use same utxos for other txs)
314    pub async fn mark_fee_payer_utxo_as_evicted(
315        &self,
316        tx: Option<DatabaseTransaction<'_>>,
317        id: u32,
318    ) -> Result<(), BridgeError> {
319        let query = sqlx::query(
320            "UPDATE tx_sender_fee_payer_utxos 
321                SET is_evicted = true 
322                WHERE id = $1 
323                OR replacement_of_id = $1",
324        )
325        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
326
327        execute_query_with_tx!(self.connection, tx, query, execute)?;
328        Ok(())
329    }
330
331    pub async fn get_confirmed_fee_payer_utxos(
332        &self,
333        tx: Option<DatabaseTransaction<'_>>,
334        id: u32,
335    ) -> Result<Vec<(Txid, u32, Amount)>, BridgeError> {
336        let query = sqlx::query_as::<_, (TxidDB, i32, i64)>(
337            "SELECT fee_payer_txid, vout, amount
338             FROM tx_sender_fee_payer_utxos fpu
339             WHERE fpu.bumped_id = $1 AND fpu.seen_block_id IS NOT NULL",
340        )
341        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
342
343        let results: Vec<(TxidDB, i32, i64)> =
344            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
345
346        results
347            .iter()
348            .map(|(fee_payer_txid, vout, amount)| {
349                Ok((
350                    fee_payer_txid.0,
351                    u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
352                    Amount::from_sat(
353                        u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
354                    ),
355                ))
356            })
357            .collect::<Result<Vec<_>, BridgeError>>()
358    }
359
360    /// Returns the id of the tx in `tx_sender_try_to_send_txs` if it exists.
361    /// Used to avoid adding duplicate transactions to the txsender.
362    pub async fn check_if_tx_exists_on_txsender(
363        &self,
364        tx: Option<DatabaseTransaction<'_>>,
365        txid: Txid,
366    ) -> Result<Option<u32>, BridgeError> {
367        let query = sqlx::query_as::<_, (i32,)>(
368            "SELECT id FROM tx_sender_try_to_send_txs WHERE txid = $1 LIMIT 1",
369        )
370        .bind(TxidDB(txid));
371
372        let result: Option<(i32,)> =
373            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
374        Ok(match result {
375            Some((id,)) => Some(u32::try_from(id).wrap_err("Failed to convert id to u32")?),
376            None => None,
377        })
378    }
379
380    pub async fn save_tx(
381        &self,
382        tx: Option<DatabaseTransaction<'_>>,
383        tx_metadata: Option<TxMetadata>,
384        raw_tx: &Transaction,
385        fee_paying_type: FeePayingType,
386        txid: Txid,
387        rbf_signing_info: Option<RbfSigningInfo>,
388    ) -> Result<u32, BridgeError> {
389        let query = sqlx::query_scalar(
390            "INSERT INTO tx_sender_try_to_send_txs (raw_tx, fee_paying_type, tx_metadata, txid, rbf_signing_info) VALUES ($1, $2::fee_paying_type, $3, $4, $5) RETURNING id"
391        )
392        .bind(serialize(raw_tx))
393        .bind(fee_paying_type)
394        .bind(serde_json::to_string(&tx_metadata).wrap_err("Failed to encode tx_metadata to JSON")?)
395        .bind(TxidDB(txid))
396        .bind(serde_json::to_string(&rbf_signing_info).wrap_err("Failed to encode tx_metadata to JSON")?);
397
398        let id: i32 = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
399        u32::try_from(id)
400            .wrap_err("Failed to convert id to u32")
401            .map_err(Into::into)
402    }
403
404    pub async fn save_rbf_txid(
405        &self,
406        tx: Option<DatabaseTransaction<'_>>,
407        id: u32,
408        txid: Txid,
409    ) -> Result<(), BridgeError> {
410        let query = sqlx::query("INSERT INTO tx_sender_rbf_txids (id, txid) VALUES ($1, $2)")
411            .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?)
412            .bind(TxidDB(txid));
413
414        execute_query_with_tx!(self.connection, tx, query, execute)?;
415        Ok(())
416    }
417
418    pub async fn get_last_rbf_txid(
419        &self,
420        tx: Option<DatabaseTransaction<'_>>,
421        id: u32,
422    ) -> Result<Option<Txid>, BridgeError> {
423        let query = sqlx::query_as::<_, (TxidDB,)>("SELECT txid FROM tx_sender_rbf_txids WHERE id = $1 ORDER BY insertion_order DESC LIMIT 1")
424            .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
425
426        let result: Option<(TxidDB,)> =
427            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
428        Ok(result.map(|(txid,)| txid.0))
429    }
430
431    pub async fn save_cancelled_outpoint(
432        &self,
433        tx: Option<DatabaseTransaction<'_>>,
434        cancelled_id: u32,
435        outpoint: bitcoin::OutPoint,
436    ) -> Result<(), BridgeError> {
437        let query = sqlx::query(
438            "INSERT INTO tx_sender_cancel_try_to_send_outpoints (cancelled_id, txid, vout) VALUES ($1, $2, $3)"
439        )
440        .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled id to i32")?)
441        .bind(TxidDB(outpoint.txid))
442        .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?);
443
444        execute_query_with_tx!(self.connection, tx, query, execute)?;
445        Ok(())
446    }
447
448    pub async fn save_cancelled_txid(
449        &self,
450        tx: Option<DatabaseTransaction<'_>>,
451        cancelled_id: u32,
452        txid: bitcoin::Txid,
453    ) -> Result<(), BridgeError> {
454        let query = sqlx::query(
455            "INSERT INTO tx_sender_cancel_try_to_send_txids (cancelled_id, txid) VALUES ($1, $2)",
456        )
457        .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled id to i32")?)
458        .bind(TxidDB(txid));
459
460        execute_query_with_tx!(self.connection, tx, query, execute)?;
461        Ok(())
462    }
463
464    pub async fn save_activated_txid(
465        &self,
466        tx: Option<DatabaseTransaction<'_>>,
467        activated_id: u32,
468        prerequisite_tx: &ActivatedWithTxid,
469    ) -> Result<(), BridgeError> {
470        let query = sqlx::query(
471            "INSERT INTO tx_sender_activate_try_to_send_txids (activated_id, txid, timelock) VALUES ($1, $2, $3)"
472        )
473        .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated id to i32")?)
474        .bind(TxidDB(prerequisite_tx.txid))
475        .bind(i32::try_from(prerequisite_tx.relative_block_height).wrap_err("Failed to convert relative block height to i32")?);
476
477        execute_query_with_tx!(self.connection, tx, query, execute)?;
478        Ok(())
479    }
480
481    pub async fn save_activated_outpoint(
482        &self,
483        tx: Option<DatabaseTransaction<'_>>,
484        activated_id: u32,
485        activated_outpoint: &ActivatedWithOutpoint,
486    ) -> Result<(), BridgeError> {
487        let query = sqlx::query(
488            "INSERT INTO tx_sender_activate_try_to_send_outpoints (activated_id, txid, vout, timelock) VALUES ($1, $2, $3, $4)"
489        )
490        .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated id to i32")?)
491        .bind(TxidDB(activated_outpoint.outpoint.txid))
492        .bind(i32::try_from(activated_outpoint.outpoint.vout).wrap_err("Failed to convert vout to i32")?)
493        .bind(i32::try_from(activated_outpoint.relative_block_height).wrap_err("Failed to convert relative block height to i32")?);
494
495        execute_query_with_tx!(self.connection, tx, query, execute)?;
496        Ok(())
497    }
498
499    /// Returns unconfirmed try-to-send transactions that satisfy all activation
500    /// conditions for sending:
501    ///
502    /// - Not in the non-active list
503    /// - Not in the cancelled list
504    /// - Transaction itself is not already confirmed
505    /// - Transaction and UTXO timelocks must be passed
506    /// - Fee rate is lower than the provided maximum fee rate (previous sends had a lower fee rate) or null (transaction wasn't sent before) OR the transaction was sent before, but the chain height increased since then, and the transaction is still not confirmed (accomplished by calling this fn with u32::MAX fee rate)
507    ///
508    /// # Parameters
509    ///
510    /// - `tx`: Optional database transaction
511    /// - `fee_rate`: Current fee rate of bitcoin or u32::MAX to retrieve all active txs
512    /// - `current_tip_height`: The current tip height of the Bitcoin blockchain
513    ///   for checking timelocks
514    ///
515    /// # Returns
516    ///
517    /// - [`Vec<u32>`]: A vector of transaction ids (db id) that are sendable.
518    pub async fn get_sendable_txs(
519        &self,
520        tx: Option<DatabaseTransaction<'_>>,
521        fee_rate: FeeRate,
522        current_tip_height: u32,
523    ) -> Result<Vec<u32>, BridgeError> {
524        let select_query = sqlx::query_as::<_, (i32,)>(
525            "WITH
526                -- Find non-active transactions (not seen or timelock not passed)
527                non_active_txs AS (
528                    -- Transactions with txid activations that aren't active yet
529                    SELECT DISTINCT
530                        activate_txid.activated_id AS tx_id
531                    FROM
532                        tx_sender_activate_try_to_send_txids AS activate_txid
533                    LEFT JOIN
534                        bitcoin_syncer AS syncer ON activate_txid.seen_block_id = syncer.id
535                    WHERE
536                        activate_txid.seen_block_id IS NULL
537                        OR (syncer.height + activate_txid.timelock > $2)
538
539                    UNION
540
541                    -- Transactions with outpoint activations that aren't active yet (not seen or timelock not passed)
542                    SELECT DISTINCT
543                        activate_outpoint.activated_id AS tx_id
544                    FROM
545                        tx_sender_activate_try_to_send_outpoints AS activate_outpoint
546                    LEFT JOIN
547                        bitcoin_syncer AS syncer ON activate_outpoint.seen_block_id = syncer.id
548                    WHERE
549                        activate_outpoint.seen_block_id IS NULL
550                        OR (syncer.height + activate_outpoint.timelock > $2)
551                ),
552
553                -- Transactions with cancelled conditions
554                cancelled_txs AS (
555                    -- Transactions with cancelled outpoints (not seen)
556                    SELECT DISTINCT
557                        cancelled_id AS tx_id
558                    FROM
559                        tx_sender_cancel_try_to_send_outpoints
560                    WHERE
561                        seen_block_id IS NOT NULL
562
563                    UNION
564
565                    -- Transactions with cancelled txids (not seen)
566                    SELECT DISTINCT
567                        cancelled_id AS tx_id
568                    FROM
569                        tx_sender_cancel_try_to_send_txids
570                    WHERE
571                        seen_block_id IS NOT NULL
572                )
573
574                -- Final query to get sendable transactions
575                SELECT
576                    txs.id
577                FROM
578                    tx_sender_try_to_send_txs AS txs
579                WHERE
580                    -- Transaction must not be in the non-active list
581                    txs.id NOT IN (SELECT tx_id FROM non_active_txs)
582                    -- Transaction must not be in the cancelled list
583                    AND txs.id NOT IN (SELECT tx_id FROM cancelled_txs)
584                    -- Transaction must not be already confirmed
585                    AND txs.seen_block_id IS NULL
586                    -- Check if fee_rate is lower than the provided fee rate or null
587                    AND (txs.effective_fee_rate IS NULL OR txs.effective_fee_rate < $1);",
588        )
589        .bind(
590            i64::try_from(fee_rate.to_sat_per_kwu())
591                .wrap_err("Failed to convert fee rate to i64")?,
592        )
593        .bind(
594            i32::try_from(current_tip_height)
595                .wrap_err("Failed to convert current tip height to i32")?,
596        );
597
598        let results = execute_query_with_tx!(self.connection, tx, select_query, fetch_all)?;
599
600        let txs = results
601            .into_iter()
602            .map(|(id,)| u32::try_from(id))
603            .collect::<Result<Vec<_>, _>>()
604            .wrap_err("Failed to convert id to u32")?;
605
606        Ok(txs)
607    }
608
609    /// Returns the effective fee rate and the block height when it was set.
610    /// Returns (None, None) if no effective fee rate has been set yet.
611    pub async fn get_effective_fee_rate(
612        &self,
613        tx: Option<DatabaseTransaction<'_>>,
614        id: u32,
615    ) -> Result<(Option<FeeRate>, Option<u32>), BridgeError> {
616        let query = sqlx::query_as::<_, (Option<i64>, Option<i32>)>(
617            "SELECT effective_fee_rate, last_bump_block_height FROM tx_sender_try_to_send_txs WHERE id = $1",
618        )
619        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
620
621        let result = execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
622
623        match result {
624            Some((Some(rate), block_height)) => Ok((
625                Some(FeeRate::from_sat_per_kwu(
626                    u64::try_from(rate).wrap_err("Failed to convert effective fee rate to u64")?,
627                )),
628                block_height.map(|h| h as u32),
629            )),
630            Some((None, _)) | None => Ok((None, None)),
631        }
632    }
633
634    /// Updates the effective fee rate and last bump block height for a transaction.
635    ///
636    /// This function only updates the row if the fee rate is actually changing (or is NULL).
637    /// If the fee rate hasn't changed, the entire update is skipped to preserve the existing
638    /// `last_bump_block_height`. This ensures the "stuck for 10 blocks" counter continues from
639    /// the last actual fee bump, not from retries with the same fee rate.
640    ///
641    /// # Parameters
642    /// * `tx` - Optional database transaction. If None, uses the connection's transaction.
643    /// * `id` - The transaction ID to update.
644    /// * `effective_fee_rate` - The new effective fee rate to set, the fee rate we sent the tx with.
645    /// * `block_height` - The current block height (only updated if fee rate changes).
646    ///
647    /// # Returns
648    /// Returns `Ok(())` on success, or a `BridgeError` if the update fails.
649    pub async fn update_effective_fee_rate(
650        &self,
651        tx: Option<DatabaseTransaction<'_>>,
652        id: u32,
653        effective_fee_rate: FeeRate,
654        block_height: u32,
655    ) -> Result<(), BridgeError> {
656        let query = sqlx::query(
657            "UPDATE tx_sender_try_to_send_txs 
658             SET effective_fee_rate = $1, last_bump_block_height = $2 
659             WHERE id = $3 AND (effective_fee_rate IS NULL OR effective_fee_rate != $1)",
660        )
661        .bind(
662            i64::try_from(effective_fee_rate.to_sat_per_kwu())
663                .wrap_err("Failed to convert effective fee rate to i64")?,
664        )
665        .bind(i32::try_from(block_height).wrap_err("Failed to convert block_height to i32")?)
666        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
667
668        execute_query_with_tx!(self.connection, tx, query, execute)?;
669
670        Ok(())
671    }
672
673    pub async fn get_try_to_send_tx(
674        &self,
675        tx: Option<DatabaseTransaction<'_>>,
676        id: u32,
677    ) -> Result<
678        (
679            Option<TxMetadata>,
680            Transaction,
681            FeePayingType,
682            Option<u32>,
683            Option<RbfSigningInfo>,
684        ),
685        BridgeError,
686    > {
687        let query = sqlx::query_as::<
688            _,
689            (
690                Option<String>,
691                Option<Vec<u8>>,
692                FeePayingType,
693                Option<i32>,
694                Option<String>,
695            ),
696        >(
697            "SELECT tx_metadata, raw_tx, fee_paying_type, seen_block_id, rbf_signing_info
698             FROM tx_sender_try_to_send_txs
699             WHERE id = $1 LIMIT 1",
700        )
701        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
702
703        let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
704        Ok((
705            serde_json::from_str(result.0.as_deref().unwrap_or("null"))
706                .wrap_err_with(|| format!("Failed to decode tx_metadata from {:?}", result.0))?,
707            result
708                .1
709                .as_deref()
710                .map(deserialize)
711                .ok_or_eyre("Expected raw_tx to be present")?
712                .wrap_err("Failed to deserialize raw_tx")?,
713            result.2,
714            result
715                .3
716                .map(u32::try_from)
717                .transpose()
718                .wrap_err("Failed to convert seen_block_id to u32")?,
719            serde_json::from_str(result.4.as_deref().unwrap_or("null")).wrap_err_with(|| {
720                format!("Failed to decode rbf_signing_info from {:?}", result.4)
721            })?,
722        ))
723    }
724
725    // Debug Functions
726
727    /// Saves a TX submission error to the debug table
728    pub async fn save_tx_debug_submission_error(
729        &self,
730        tx: Option<DatabaseTransaction<'_>>,
731        tx_id: u32,
732        error_message: &str,
733    ) -> Result<(), BridgeError> {
734        let query = sqlx::query(
735            "INSERT INTO tx_sender_debug_submission_errors (tx_id, error_message) VALUES ($1, $2)",
736        )
737        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?)
738        .bind(error_message);
739
740        execute_query_with_tx!(self.connection, tx, query, execute)?;
741        Ok(())
742    }
743
744    /// Updates or inserts the TX's sending state in the debug table
745    ///
746    /// Does not support a Transaction because it's for debugging purposes. Make
747    /// sure that tx_id exists (i.e. creation is committed) before use
748    pub async fn update_tx_debug_sending_state(
749        &self,
750        tx_id: u32,
751        state: &str,
752        activated: bool,
753    ) -> Result<(), BridgeError> {
754        let query = sqlx::query(
755            r#"
756            INSERT INTO tx_sender_debug_sending_state
757            (tx_id, state, last_update, activated_timestamp)
758            VALUES ($1, $2, NOW(),
759                CASE
760                    WHEN $3 = TRUE THEN NOW()
761                    ELSE NULL
762                END
763            )
764            ON CONFLICT (tx_id) DO UPDATE SET
765            state = $2,
766            last_update = NOW(),
767            activated_timestamp = COALESCE(tx_sender_debug_sending_state.activated_timestamp,
768                CASE
769                    WHEN $3 = TRUE THEN NOW()
770                    ELSE NULL
771                END
772            )
773            "#,
774        )
775        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?)
776        .bind(state)
777        .bind(activated);
778
779        self.connection.execute(query).await?;
780        Ok(())
781    }
782
783    /// Gets the current debug state of a TX
784    pub async fn get_tx_debug_info(
785        &self,
786        tx: Option<DatabaseTransaction<'_>>,
787        tx_id: u32,
788    ) -> Result<Option<String>, BridgeError> {
789        let query = sqlx::query_as::<_, (Option<String>,)>(
790            r#"
791            SELECT state
792            FROM tx_sender_debug_sending_state
793            WHERE tx_id = $1
794            "#,
795        )
796        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
797
798        let result = execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
799        match result {
800            Some((state,)) => Ok(state),
801            None => Ok(None),
802        }
803    }
804
805    /// Gets all TX submission errors
806    pub async fn get_tx_debug_submission_errors(
807        &self,
808        tx: Option<DatabaseTransaction<'_>>,
809        tx_id: u32,
810    ) -> Result<Vec<(String, String)>, BridgeError> {
811        let query = sqlx::query_as::<_, (String, String)>(
812            r#"
813            SELECT error_message, timestamp::TEXT
814            FROM tx_sender_debug_submission_errors
815            WHERE tx_id = $1
816            ORDER BY timestamp ASC
817            "#,
818        )
819        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
820
821        execute_query_with_tx!(self.connection, tx, query, fetch_all).map_err(Into::into)
822    }
823
824    /// Gets all fee payer UTXOs for a TX with their confirmation status
825    pub async fn get_tx_debug_fee_payer_utxos(
826        &self,
827        tx: Option<DatabaseTransaction<'_>>,
828        tx_id: u32,
829    ) -> Result<Vec<(Txid, u32, Amount, bool)>, BridgeError> {
830        let query = sqlx::query_as::<_, (TxidDB, i32, i64, bool)>(
831            r#"
832            SELECT fee_payer_txid, vout, amount, seen_block_id IS NOT NULL as confirmed
833            FROM tx_sender_fee_payer_utxos
834            WHERE bumped_id = $1
835            "#,
836        )
837        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
838
839        let results: Vec<(TxidDB, i32, i64, bool)> =
840            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
841
842        results
843            .iter()
844            .map(|(fee_payer_txid, vout, amount, confirmed)| {
845                Ok((
846                    fee_payer_txid.0,
847                    u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
848                    Amount::from_sat(
849                        u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
850                    ),
851                    *confirmed,
852                ))
853            })
854            .collect::<Result<Vec<_>, BridgeError>>()
855    }
856}
857
858#[async_trait]
859impl clementine_tx_sender::TxSenderDatabase for Database {
860    type Transaction = sqlx::Transaction<'static, sqlx::Postgres>;
861    async fn begin_transaction(
862        &self,
863    ) -> Result<sqlx::Transaction<'static, sqlx::Postgres>, BridgeError> {
864        self.begin_transaction().await
865    }
866
867    async fn commit_transaction(
868        &self,
869        tx: sqlx::Transaction<'static, sqlx::Postgres>,
870    ) -> Result<(), BridgeError> {
871        tx.commit().await.map_err(Into::into)
872    }
873
874    async fn save_tx_debug_submission_error(
875        &self,
876        dbtx: Option<&mut Self::Transaction>,
877        tx_id: u32,
878        error_message: &str,
879    ) -> Result<(), BridgeError> {
880        self.save_tx_debug_submission_error(dbtx, tx_id, error_message)
881            .await
882    }
883
884    async fn get_sendable_txs(
885        &self,
886        fee_rate: FeeRate,
887        current_tip_height: u32,
888    ) -> Result<Vec<u32>, BridgeError> {
889        self.get_sendable_txs(None, fee_rate, current_tip_height)
890            .await
891    }
892
893    async fn get_try_to_send_tx(
894        &self,
895        tx: Option<&mut Self::Transaction>,
896        id: u32,
897    ) -> Result<
898        (
899            Option<TxMetadata>,
900            Transaction,
901            FeePayingType,
902            Option<u32>,
903            Option<RbfSigningInfo>,
904        ),
905        BridgeError,
906    > {
907        self.get_try_to_send_tx(tx, id).await
908    }
909
910    async fn update_tx_debug_sending_state(
911        &self,
912        tx_id: u32,
913        state: &str,
914        activated: bool,
915    ) -> Result<(), BridgeError> {
916        self.update_tx_debug_sending_state(tx_id, state, activated)
917            .await
918    }
919
920    async fn get_all_unconfirmed_fee_payer_txs(
921        &self,
922        tx: Option<&mut Self::Transaction>,
923    ) -> Result<Vec<(u32, u32, Txid, u32, Amount, Option<u32>)>, BridgeError> {
924        self.get_all_unconfirmed_fee_payer_txs(tx).await
925    }
926
927    async fn get_unconfirmed_fee_payer_txs(
928        &self,
929        tx: Option<&mut Self::Transaction>,
930        bumped_id: u32,
931    ) -> Result<Vec<(u32, Txid, u32, Amount)>, BridgeError> {
932        self.get_unconfirmed_fee_payer_txs(tx, bumped_id).await
933    }
934
935    async fn mark_fee_payer_utxo_as_evicted(
936        &self,
937        tx: Option<&mut Self::Transaction>,
938        id: u32,
939    ) -> Result<(), BridgeError> {
940        self.mark_fee_payer_utxo_as_evicted(tx, id).await
941    }
942
943    async fn get_confirmed_fee_payer_utxos(
944        &self,
945        tx: Option<&mut Self::Transaction>,
946        id: u32,
947    ) -> Result<Vec<(Txid, u32, Amount)>, BridgeError> {
948        self.get_confirmed_fee_payer_utxos(tx, id).await
949    }
950
951    async fn save_fee_payer_tx(
952        &self,
953        tx: Option<&mut Self::Transaction>,
954        bumped_id: u32,
955        fee_payer_txid: Txid,
956        vout: u32,
957        amount: Amount,
958        replacement_of_id: Option<u32>,
959    ) -> Result<(), BridgeError> {
960        self.save_fee_payer_tx(
961            tx,
962            bumped_id,
963            fee_payer_txid,
964            vout,
965            amount,
966            replacement_of_id,
967        )
968        .await
969    }
970
971    async fn get_last_rbf_txid(
972        &self,
973        tx: Option<&mut Self::Transaction>,
974        id: u32,
975    ) -> Result<Option<Txid>, BridgeError> {
976        self.get_last_rbf_txid(tx, id).await
977    }
978
979    async fn save_rbf_txid(
980        &self,
981        tx: Option<&mut Self::Transaction>,
982        id: u32,
983        txid: Txid,
984    ) -> Result<(), BridgeError> {
985        self.save_rbf_txid(tx, id, txid).await
986    }
987
988    async fn save_cancelled_outpoint(
989        &self,
990        tx: Option<&mut Self::Transaction>,
991        cancelled_id: u32,
992        outpoint: bitcoin::OutPoint,
993    ) -> Result<(), BridgeError> {
994        self.save_cancelled_outpoint(tx, cancelled_id, outpoint)
995            .await
996    }
997
998    async fn save_cancelled_txid(
999        &self,
1000        tx: Option<&mut Self::Transaction>,
1001        cancelled_id: u32,
1002        txid: bitcoin::Txid,
1003    ) -> Result<(), BridgeError> {
1004        self.save_cancelled_txid(tx, cancelled_id, txid).await
1005    }
1006
1007    async fn save_activated_txid(
1008        &self,
1009        tx: Option<&mut Self::Transaction>,
1010        activated_id: u32,
1011        prerequisite_tx: &ActivatedWithTxid,
1012    ) -> Result<(), BridgeError> {
1013        self.save_activated_txid(tx, activated_id, prerequisite_tx)
1014            .await
1015    }
1016
1017    async fn save_activated_outpoint(
1018        &self,
1019        tx: Option<&mut Self::Transaction>,
1020        activated_id: u32,
1021        activated_outpoint: &ActivatedWithOutpoint,
1022    ) -> Result<(), BridgeError> {
1023        self.save_activated_outpoint(tx, activated_id, activated_outpoint)
1024            .await
1025    }
1026
1027    async fn get_effective_fee_rate(
1028        &self,
1029        tx: Option<&mut Self::Transaction>,
1030        id: u32,
1031    ) -> Result<(Option<FeeRate>, Option<u32>), BridgeError> {
1032        self.get_effective_fee_rate(tx, id).await
1033    }
1034
1035    async fn update_effective_fee_rate(
1036        &self,
1037        tx: Option<&mut Self::Transaction>,
1038        id: u32,
1039        effective_fee_rate: FeeRate,
1040        current_tip_height: u32,
1041    ) -> Result<(), BridgeError> {
1042        self.update_effective_fee_rate(tx, id, effective_fee_rate, current_tip_height)
1043            .await
1044    }
1045
1046    async fn check_if_tx_exists_on_txsender(
1047        &self,
1048        tx: Option<&mut Self::Transaction>,
1049        txid: Txid,
1050    ) -> Result<Option<u32>, BridgeError> {
1051        self.check_if_tx_exists_on_txsender(tx, txid).await
1052    }
1053
1054    async fn save_tx(
1055        &self,
1056        tx: Option<&mut Self::Transaction>,
1057        tx_metadata: Option<TxMetadata>,
1058        raw_tx: &Transaction,
1059        fee_paying_type: FeePayingType,
1060        txid: Txid,
1061        rbf_signing_info: Option<RbfSigningInfo>,
1062    ) -> Result<u32, BridgeError> {
1063        self.save_tx(
1064            tx,
1065            tx_metadata,
1066            raw_tx,
1067            fee_paying_type,
1068            txid,
1069            rbf_signing_info,
1070        )
1071        .await
1072    }
1073
1074    async fn get_tx_debug_info(
1075        &self,
1076        tx: Option<&mut Self::Transaction>,
1077        tx_id: u32,
1078    ) -> Result<Option<String>, BridgeError> {
1079        self.get_tx_debug_info(tx, tx_id).await
1080    }
1081
1082    async fn get_tx_debug_submission_errors(
1083        &self,
1084        tx: Option<&mut Self::Transaction>,
1085        tx_id: u32,
1086    ) -> Result<Vec<(String, String)>, BridgeError> {
1087        self.get_tx_debug_submission_errors(tx, tx_id).await
1088    }
1089
1090    async fn get_tx_debug_fee_payer_utxos(
1091        &self,
1092        tx: Option<&mut Self::Transaction>,
1093        tx_id: u32,
1094    ) -> Result<Vec<(Txid, u32, Amount, bool)>, BridgeError> {
1095        self.get_tx_debug_fee_payer_utxos(tx, tx_id).await
1096    }
1097
1098    #[cfg(all(test, feature = "automation"))]
1099    async fn debug_inactive_txs(&self, fee_rate: FeeRate, current_tip_height: u32) {
1100        Database::debug_inactive_txs(self, fee_rate, current_tip_height).await
1101    }
1102
1103    async fn fetch_next_bitcoin_syncer_evt(
1104        &self,
1105        tx: &mut Self::Transaction,
1106        consumer_handle: &str,
1107    ) -> Result<Option<clementine_primitives::BitcoinSyncerEvent>, BridgeError> {
1108        self.fetch_next_bitcoin_syncer_evt(tx, consumer_handle)
1109            .await
1110    }
1111
1112    async fn sync_transaction_confirmations(
1113        &self,
1114        tx: Option<&mut Self::Transaction>,
1115    ) -> Result<(), BridgeError> {
1116        self.sync_transaction_confirmations(tx).await
1117    }
1118
1119    async fn get_max_height(
1120        &self,
1121        tx: Option<&mut Self::Transaction>,
1122    ) -> Result<Option<u32>, BridgeError> {
1123        self.get_max_height(tx).await
1124    }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129    use super::*;
1130    use crate::database::Database;
1131    use crate::test::common::*;
1132    use bitcoin::absolute::Height;
1133    use bitcoin::hashes::Hash;
1134    use bitcoin::transaction::Version;
1135    use bitcoin::{Block, OutPoint, TapNodeHash, Txid};
1136
1137    async fn setup_test_db() -> Database {
1138        let config = create_test_config_with_thread_name().await;
1139        Database::new(&config).await.unwrap()
1140    }
1141
1142    #[tokio::test]
1143    async fn test_save_and_get_tx() {
1144        let db = setup_test_db().await;
1145        let tx = Transaction {
1146            version: Version::TWO,
1147            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1148            input: vec![],
1149            output: vec![],
1150        };
1151
1152        // Test saving tx
1153        let txid = tx.compute_txid();
1154        let rbfinfo = Some(RbfSigningInfo {
1155            vout: 123,
1156            tweak_merkle_root: Some(TapNodeHash::all_zeros()),
1157        });
1158        let id = db
1159            .save_tx(None, None, &tx, FeePayingType::CPFP, txid, rbfinfo.clone())
1160            .await
1161            .unwrap();
1162
1163        // Test retrieving tx
1164        let (_, retrieved_tx, fee_paying_type, seen_block_id, rbf_signing_info) =
1165            db.get_try_to_send_tx(None, id).await.unwrap();
1166        assert_eq!(tx.version, retrieved_tx.version);
1167        assert_eq!(fee_paying_type, FeePayingType::CPFP);
1168        assert_eq!(seen_block_id, None);
1169        assert_eq!(rbf_signing_info, rbfinfo);
1170    }
1171
1172    #[tokio::test]
1173    async fn test_fee_payer_utxo_operations() {
1174        let db = setup_test_db().await;
1175        let mut dbtx = db.begin_transaction().await.unwrap();
1176
1177        // First create a transaction that will be bumped
1178        let tx = Transaction {
1179            version: Version::TWO,
1180            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1181            input: vec![],
1182            output: vec![],
1183        };
1184
1185        // Save the transaction first
1186        let tx_id = db
1187            .save_tx(
1188                Some(&mut dbtx),
1189                None,
1190                &tx,
1191                FeePayingType::CPFP,
1192                Txid::all_zeros(),
1193                None,
1194            )
1195            .await
1196            .unwrap();
1197
1198        // Now we can use this tx_id as bumped_id
1199        let fee_payer_txid = Txid::hash(&[1u8; 32]);
1200        db.save_fee_payer_tx(
1201            Some(&mut dbtx),
1202            tx_id,
1203            fee_payer_txid,
1204            0,
1205            Amount::from_sat(50000),
1206            None,
1207        )
1208        .await
1209        .unwrap();
1210
1211        dbtx.commit().await.unwrap();
1212    }
1213
1214    #[tokio::test]
1215    async fn test_sync_transaction_confirmations() {
1216        const BLOCK_HEX: &str = "0200000035ab154183570282ce9afc0b494c9fc6a3cfea05aa8c1add2ecc56490000000038ba3d78e4500a5a7570dbe61960398add4410d278b21cd9708e6d9743f374d544fc055227f1001c29c1ea3b0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff3703a08601000427f1001c046a510100522cfabe6d6d0000000000000000000068692066726f6d20706f6f6c7365727665726aac1eeeed88ffffffff0100f2052a010000001976a914912e2b234f941f30b18afbb4fa46171214bf66c888ac00000000";
1217        let block: Block = deserialize(&hex::decode(BLOCK_HEX).unwrap()).unwrap();
1218
1219        let db = setup_test_db().await;
1220        let mut dbtx = db.begin_transaction().await.unwrap();
1221
1222        // Create a block to use for confirmation
1223        let block_id = crate::bitcoin_syncer::save_block(&db, &mut dbtx, &block, 100)
1224            .await
1225            .unwrap();
1226
1227        // Create a transaction
1228        let tx = Transaction {
1229            version: Version::TWO,
1230            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1231            input: vec![],
1232            output: vec![],
1233        };
1234        let tx_id = db
1235            .save_tx(
1236                Some(&mut dbtx),
1237                None,
1238                &tx,
1239                FeePayingType::CPFP,
1240                Txid::all_zeros(),
1241                None,
1242            )
1243            .await
1244            .unwrap();
1245
1246        // Save fee payer UTXO
1247        let fee_payer_txid = Txid::hash(&[1u8; 32]);
1248        db.save_fee_payer_tx(
1249            Some(&mut dbtx),
1250            tx_id,
1251            fee_payer_txid,
1252            0,
1253            Amount::from_sat(50000),
1254            None,
1255        )
1256        .await
1257        .unwrap();
1258
1259        // Save the transaction in the block
1260        db.insert_txid_to_block(&mut dbtx, block_id, &fee_payer_txid)
1261            .await
1262            .unwrap();
1263
1264        // Sync transaction confirmations
1265        db.sync_transaction_confirmations(Some(&mut dbtx))
1266            .await
1267            .unwrap();
1268
1269        dbtx.commit().await.unwrap();
1270    }
1271
1272    #[tokio::test]
1273    async fn test_cancelled_outpoints_and_txids() {
1274        let db = setup_test_db().await;
1275        let mut dbtx = db.begin_transaction().await.unwrap();
1276
1277        // First create a transaction to cancel
1278        let tx = Transaction {
1279            version: Version::TWO,
1280            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1281            input: vec![],
1282            output: vec![],
1283        };
1284
1285        // Save the transaction first
1286        let tx_id = db
1287            .save_tx(
1288                Some(&mut dbtx),
1289                None,
1290                &tx,
1291                FeePayingType::CPFP,
1292                Txid::all_zeros(),
1293                None,
1294            )
1295            .await
1296            .unwrap();
1297
1298        // Now we can use this tx_id as cancelled_id
1299        let txid = Txid::hash(&[0u8; 32]);
1300        let vout = 0;
1301
1302        // Test cancelling by outpoint
1303        db.save_cancelled_outpoint(Some(&mut dbtx), tx_id, OutPoint { txid, vout })
1304            .await
1305            .unwrap();
1306
1307        // Test cancelling by txid
1308        db.save_cancelled_txid(Some(&mut dbtx), tx_id, txid)
1309            .await
1310            .unwrap();
1311
1312        dbtx.commit().await.unwrap();
1313    }
1314
1315    #[tokio::test]
1316    async fn test_get_sendable_txs() {
1317        let db = setup_test_db().await;
1318        let mut dbtx = db.begin_transaction().await.unwrap();
1319
1320        // Create and save test transactions
1321        let tx1 = Transaction {
1322            version: Version::TWO,
1323            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1324            input: vec![],
1325            output: vec![],
1326        };
1327        let tx2 = Transaction {
1328            version: Version::TWO,
1329            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1330            input: vec![],
1331            output: vec![],
1332        };
1333
1334        let id1 = db
1335            .save_tx(
1336                Some(&mut dbtx),
1337                None,
1338                &tx1,
1339                FeePayingType::CPFP,
1340                Txid::all_zeros(),
1341                None,
1342            )
1343            .await
1344            .unwrap();
1345        let id2 = db
1346            .save_tx(
1347                Some(&mut dbtx),
1348                None,
1349                &tx2,
1350                FeePayingType::RBF,
1351                Txid::all_zeros(),
1352                None,
1353            )
1354            .await
1355            .unwrap();
1356
1357        // Test getting sendable txs
1358        let fee_rate = FeeRate::from_sat_per_vb(3).unwrap();
1359        let current_tip_height = 100;
1360
1361        let sendable_txs = db
1362            .get_sendable_txs(Some(&mut dbtx), fee_rate, current_tip_height)
1363            .await
1364            .unwrap();
1365
1366        // Both transactions should be sendable as they have no prerequisites or cancellations
1367        assert_eq!(sendable_txs.len(), 2);
1368        assert!(sendable_txs.contains(&id1));
1369        assert!(sendable_txs.contains(&id2));
1370
1371        // Test updating effective fee rate for tx1 with a fee rate equal to the query fee rate
1372        // This should  make tx1 not sendable since the condition is "effective_fee_rate < fee_rate"
1373        db.update_effective_fee_rate(Some(&mut dbtx), id1, fee_rate, 100)
1374            .await
1375            .unwrap();
1376
1377        let sendable_txs = db
1378            .get_sendable_txs(Some(&mut dbtx), fee_rate, current_tip_height)
1379            .await
1380            .unwrap();
1381        assert_eq!(sendable_txs.len(), 1);
1382        assert!(sendable_txs.contains(&id2));
1383
1384        // increase fee rate, all should be sendable again
1385        let sendable_txs = db
1386            .get_sendable_txs(
1387                Some(&mut dbtx),
1388                FeeRate::from_sat_per_vb(4).unwrap(),
1389                current_tip_height + 1,
1390            )
1391            .await
1392            .unwrap();
1393        assert_eq!(sendable_txs.len(), 2);
1394        assert!(sendable_txs.contains(&id1));
1395        assert!(sendable_txs.contains(&id2));
1396
1397        dbtx.commit().await.unwrap();
1398    }
1399
1400    #[tokio::test]
1401    async fn test_debug_sending_state() {
1402        let db = setup_test_db().await;
1403        let mut dbtx = db.begin_transaction().await.unwrap();
1404
1405        // Create a test transaction
1406        let tx = Transaction {
1407            version: Version::TWO,
1408            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1409            input: vec![],
1410            output: vec![],
1411        };
1412
1413        // Insert the transaction into the database
1414        let tx_id = db
1415            .save_tx(
1416                None, // needed so that tx_id is available
1417                None,
1418                &tx,
1419                FeePayingType::RBF,
1420                tx.compute_txid(),
1421                None,
1422            )
1423            .await
1424            .unwrap();
1425
1426        // Test updating the sending state
1427        let initial_state = "waiting_for_fee_payer_utxos";
1428        db.update_tx_debug_sending_state(tx_id, initial_state, false)
1429            .await
1430            .unwrap();
1431
1432        // Verify the state was saved correctly
1433        let state = db.get_tx_debug_info(Some(&mut dbtx), tx_id).await.unwrap();
1434        assert_eq!(state, Some(initial_state.to_string()));
1435
1436        // Update the state with activation
1437        let active_state = "ready_to_send";
1438        db.update_tx_debug_sending_state(tx_id, active_state, true)
1439            .await
1440            .unwrap();
1441
1442        // Verify the state was updated
1443        let state = db.get_tx_debug_info(Some(&mut dbtx), tx_id).await.unwrap();
1444        assert_eq!(state, Some(active_state.to_string()));
1445
1446        // Test saving an error message
1447        let error_message = "Failed to send transaction: insufficient fee";
1448        db.save_tx_debug_submission_error(Some(&mut dbtx), tx_id, error_message)
1449            .await
1450            .unwrap();
1451
1452        // Verify the error was saved
1453        let errors = db
1454            .get_tx_debug_submission_errors(Some(&mut dbtx), tx_id)
1455            .await
1456            .unwrap();
1457        assert_eq!(errors.len(), 1);
1458        assert_eq!(errors[0].0, error_message);
1459
1460        // Add another error
1461        let second_error = "Network connection timeout";
1462        db.save_tx_debug_submission_error(Some(&mut dbtx), tx_id, second_error)
1463            .await
1464            .unwrap();
1465
1466        // Verify both errors are retrieved in order
1467        let errors = db
1468            .get_tx_debug_submission_errors(Some(&mut dbtx), tx_id)
1469            .await
1470            .unwrap();
1471        assert_eq!(errors.len(), 2);
1472        assert_eq!(errors[0].0, error_message);
1473        assert_eq!(errors[1].0, second_error);
1474
1475        // Update state again
1476        let final_state = "sent";
1477        db.update_tx_debug_sending_state(tx_id, final_state, true)
1478            .await
1479            .unwrap();
1480
1481        // Verify final state
1482        let state = db.get_tx_debug_info(Some(&mut dbtx), tx_id).await.unwrap();
1483        assert_eq!(state, Some(final_state.to_string()));
1484
1485        dbtx.commit().await.unwrap();
1486    }
1487}