clementine_core/database/
tx_sender.rs

1//! This module includes database functions which are mainly used by the transaction sender.
2
3use async_trait::async_trait;
4
5use super::{wrapper::TxidDB, Database, DatabaseTransaction};
6use crate::execute_query_with_tx;
7use bitcoin::{
8    consensus::{deserialize, serialize},
9    Amount, FeeRate, Transaction, Txid,
10};
11use clementine_errors::BridgeError;
12use clementine_tx_sender::{ActivatedWithOutpoint, ActivatedWithTxid};
13use clementine_utils::{FeePayingType, RbfSigningInfo, TxMetadata};
14use eyre::{Context, OptionExt};
15use sqlx::Executor;
16use std::ops::DerefMut;
17
18impl Database {
19    /// Set all transactions' `seen_block_id` to the given block id. This will
20    /// be called once a block is confirmed on the Bitcoin side.
21    pub async fn confirm_transactions(
22        &self,
23        tx: DatabaseTransaction<'_>,
24        block_id: u32,
25    ) -> Result<(), BridgeError> {
26        let block_id = i32::try_from(block_id).wrap_err("Failed to convert block id to i32")?;
27
28        // CTEs for collecting a block's transactions, spent UTXOs and confirmed
29        // RBF transactions.
30        let common_ctes = r#"
31            WITH relevant_txs AS (
32                SELECT txid
33                FROM bitcoin_syncer_txs
34                WHERE block_id = $1
35            ),
36            relevant_spent_utxos AS (
37                SELECT txid, vout
38                FROM bitcoin_syncer_spent_utxos
39                WHERE block_id = $1
40            ),
41            confirmed_rbf_ids AS (
42                SELECT rbf.id
43                FROM tx_sender_rbf_txids AS rbf
44                JOIN bitcoin_syncer_txs AS syncer ON rbf.txid = syncer.txid
45                WHERE syncer.block_id = $1
46            )
47        "#;
48
49        // Update tx_sender_activate_try_to_send_txids
50        sqlx::query(&format!(
51            "{common_ctes}
52            UPDATE tx_sender_activate_try_to_send_txids AS tap
53            SET seen_block_id = $1
54            WHERE tap.txid IN (SELECT txid FROM relevant_txs)
55            AND tap.seen_block_id IS NULL"
56        ))
57        .bind(block_id)
58        .execute(tx.deref_mut())
59        .await?;
60
61        // Update tx_sender_activate_try_to_send_outpoints
62        sqlx::query(&format!(
63            "{common_ctes}
64            UPDATE tx_sender_activate_try_to_send_outpoints AS tap
65            SET seen_block_id = $1
66            WHERE (tap.txid, tap.vout) IN (SELECT txid, vout FROM relevant_spent_utxos)
67            AND tap.seen_block_id IS NULL"
68        ))
69        .bind(block_id)
70        .execute(tx.deref_mut())
71        .await?;
72
73        // Update tx_sender_cancel_try_to_send_txids
74        sqlx::query(&format!(
75            "{common_ctes}
76            UPDATE tx_sender_cancel_try_to_send_txids AS ctt
77            SET seen_block_id = $1
78            WHERE ctt.txid IN (SELECT txid FROM relevant_txs)
79            AND ctt.seen_block_id IS NULL"
80        ))
81        .bind(block_id)
82        .execute(tx.deref_mut())
83        .await?;
84
85        // Update tx_sender_cancel_try_to_send_outpoints
86        sqlx::query(&format!(
87            "{common_ctes}
88            UPDATE tx_sender_cancel_try_to_send_outpoints AS cto
89            SET seen_block_id = $1
90            WHERE (cto.txid, cto.vout) IN (SELECT txid, vout FROM relevant_spent_utxos)
91            AND cto.seen_block_id IS NULL"
92        ))
93        .bind(block_id)
94        .execute(tx.deref_mut())
95        .await?;
96
97        // Update tx_sender_fee_payer_utxos
98        sqlx::query(&format!(
99            "{common_ctes}
100            UPDATE tx_sender_fee_payer_utxos AS fpu
101            SET seen_block_id = $1
102            WHERE fpu.fee_payer_txid IN (SELECT txid FROM relevant_txs)
103            AND fpu.seen_block_id IS NULL"
104        ))
105        .bind(block_id)
106        .execute(tx.deref_mut())
107        .await?;
108
109        // Update tx_sender_try_to_send_txs for CPFP txid confirmation
110        sqlx::query(&format!(
111            "{common_ctes}
112            UPDATE tx_sender_try_to_send_txs AS txs
113            SET seen_block_id = $1
114            WHERE txs.txid IN (SELECT txid FROM relevant_txs)
115            AND txs.seen_block_id IS NULL"
116        ))
117        .bind(block_id)
118        .execute(tx.deref_mut())
119        .await?;
120
121        // Update tx_sender_try_to_send_txs for RBF txid confirmation
122        sqlx::query(&format!(
123            "{common_ctes}
124            UPDATE tx_sender_try_to_send_txs AS txs
125            SET seen_block_id = $1
126            WHERE txs.id IN (SELECT id FROM confirmed_rbf_ids)
127            AND txs.seen_block_id IS NULL"
128        ))
129        .bind(block_id)
130        .execute(tx.deref_mut())
131        .await?;
132
133        let bg_db = self.clone();
134        // Update debug information in the background to not block core behavior
135        tokio::spawn(async move {
136            // Get confirmed direct transactions for debugging
137            let Ok(confirmed_direct_txs): Result<Vec<(i32, TxidDB)>, _> = sqlx::query_as(&format!(
138                "{common_ctes}
139            SELECT txs.id, txs.txid
140            FROM tx_sender_try_to_send_txs AS txs
141            WHERE txs.txid IN (SELECT txid FROM relevant_txs)",
142            ))
143            .bind(block_id)
144            .fetch_all(&bg_db.connection)
145            .await
146            else {
147                tracing::error!("Failed to update debug info for confirmed txs");
148                return;
149            };
150
151            // Get confirmed RBF transactions for debugging
152            let Ok(confirmed_rbf_txs): Result<Vec<(i32,)>, _> = sqlx::query_as(&format!(
153                "{common_ctes}
154            SELECT txs.id
155            FROM tx_sender_try_to_send_txs AS txs
156            WHERE txs.id IN (SELECT id FROM confirmed_rbf_ids)",
157            ))
158            .bind(block_id)
159            .fetch_all(&bg_db.connection)
160            .await
161            else {
162                tracing::error!("Failed to update debug info for confirmed txs");
163                return;
164            };
165
166            // Record debug info for confirmed transactions
167            for (tx_id, txid) in confirmed_direct_txs {
168                // Add debug state change
169                tracing::debug!(try_to_send_id=?tx_id,  "Transaction confirmed in block {}: direct confirmation of txid {}",
170            block_id, txid.0);
171
172                // Update sending state
173                let _ = bg_db
174                    .update_tx_debug_sending_state(tx_id as u32, "confirmed", true)
175                    .await;
176            }
177
178            // Record debug info for confirmed RBF transactions
179            for (tx_id,) in confirmed_rbf_txs {
180                // Add debug state change
181                tracing::debug!(try_to_send_id=?tx_id,  "Transaction confirmed in block {}: RBF confirmation",
182            block_id);
183
184                // Update sending state
185                let _ = bg_db
186                    .update_tx_debug_sending_state(tx_id as u32, "confirmed", true)
187                    .await;
188            }
189        });
190
191        Ok(())
192    }
193
194    /// Unassigns `seen_block_id` from all transactions in the given block id.
195    /// By default, all transactions' `seen_block_id` is set to NULL. And they
196    /// get assigned a block id when they are confirmed on Bitcoin side. If a
197    /// reorg happens, block ids must be unassigned from all transactions.
198    pub async fn unconfirm_transactions(
199        &self,
200        tx: DatabaseTransaction<'_>,
201        block_id: u32,
202    ) -> Result<(), BridgeError> {
203        let block_id = i32::try_from(block_id).wrap_err("Failed to convert block id to i32")?;
204
205        // Need to get these before they're unconfirmed below, so that we can update the debug info
206        // Ignore the error here to not affect production behavior.
207        let previously_confirmed_txs = sqlx::query_as::<_, (i32,)>(
208            "SELECT id FROM tx_sender_try_to_send_txs WHERE seen_block_id = $1",
209        )
210        .bind(block_id)
211        .fetch_all(tx.deref_mut())
212        .await;
213
214        let bg_db = self.clone();
215        tokio::spawn(async move {
216            let previously_confirmed_txs = match previously_confirmed_txs {
217                Ok(txs) => txs,
218                Err(e) => {
219                    tracing::error!(error=?e, "Failed to get previously confirmed txs from database");
220                    return;
221                }
222            };
223
224            for (tx_id,) in previously_confirmed_txs {
225                tracing::debug!(try_to_send_id=?tx_id, "Transaction unconfirmed in block {}: unconfirming", block_id);
226                let _ = bg_db
227                    .update_tx_debug_sending_state(tx_id as u32, "unconfirmed", false)
228                    .await;
229            }
230        });
231
232        // Unconfirm tx_sender_fee_payer_utxos
233        // Update tx_sender_activate_try_to_send_txids
234        sqlx::query(
235            "UPDATE tx_sender_activate_try_to_send_txids AS tap
236             SET seen_block_id = NULL
237             WHERE tap.seen_block_id = $1",
238        )
239        .bind(block_id)
240        .execute(tx.deref_mut())
241        .await?;
242
243        // Update tx_sender_activate_try_to_send_outpoints
244        sqlx::query(
245            "UPDATE tx_sender_activate_try_to_send_outpoints AS tap
246             SET seen_block_id = NULL
247             WHERE tap.seen_block_id = $1",
248        )
249        .bind(block_id)
250        .execute(tx.deref_mut())
251        .await?;
252
253        // Update tx_sender_cancel_try_to_send_txids
254        sqlx::query(
255            "UPDATE tx_sender_cancel_try_to_send_txids AS ctt
256             SET seen_block_id = NULL
257             WHERE ctt.seen_block_id = $1",
258        )
259        .bind(block_id)
260        .execute(tx.deref_mut())
261        .await?;
262
263        // Update tx_sender_cancel_try_to_send_outpoints
264        sqlx::query(
265            "UPDATE tx_sender_cancel_try_to_send_outpoints AS cto
266             SET seen_block_id = NULL
267             WHERE cto.seen_block_id = $1",
268        )
269        .bind(block_id)
270        .execute(tx.deref_mut())
271        .await?;
272
273        // Update tx_sender_fee_payer_utxos
274        sqlx::query(
275            "UPDATE tx_sender_fee_payer_utxos AS fpu
276             SET seen_block_id = NULL
277             WHERE fpu.seen_block_id = $1",
278        )
279        .bind(block_id)
280        .execute(tx.deref_mut())
281        .await?;
282
283        // Update tx_sender_try_to_send_txs
284        sqlx::query(
285            "UPDATE tx_sender_try_to_send_txs AS txs
286             SET seen_block_id = NULL
287             WHERE txs.seen_block_id = $1",
288        )
289        .bind(block_id)
290        .execute(tx.deref_mut())
291        .await?;
292
293        Ok(())
294    }
295
296    /// Saves a fee payer transaction to the database.
297    ///
298    /// # Arguments
299    /// * `bumped_id` - The id of the bumped transaction
300    /// * `fee_payer_txid` - The txid of the fee payer transaction
301    /// * `vout` - The output index of the UTXO
302    /// * `script_pubkey` - The script pubkey of the UTXO
303    /// * `amount` - The amount in satoshis
304    pub async fn save_fee_payer_tx(
305        &self,
306        tx: Option<DatabaseTransaction<'_>>,
307        bumped_id: u32,
308        fee_payer_txid: Txid,
309        vout: u32,
310        amount: Amount,
311        replacement_of_id: Option<u32>,
312    ) -> Result<(), BridgeError> {
313        let query = sqlx::query(
314            "INSERT INTO tx_sender_fee_payer_utxos (bumped_id, fee_payer_txid, vout, amount, replacement_of_id)
315             VALUES ($1, $2, $3, $4, $5)",
316        )
317        .bind(i32::try_from(bumped_id).wrap_err("Failed to convert bumped id to i32")?)
318        .bind(TxidDB(fee_payer_txid))
319        .bind(i32::try_from(vout).wrap_err("Failed to convert vout to i32")?)
320        .bind(i64::try_from(amount.to_sat()).wrap_err("Failed to convert amount to i64")?)
321        .bind(replacement_of_id.map( i32::try_from).transpose().wrap_err("Failed to convert replacement of id to i32")?);
322
323        execute_query_with_tx!(self.connection, tx, query, execute)?;
324
325        Ok(())
326    }
327
328    /// Returns all unconfirmed fee payer transactions for a try-to-send tx.
329    /// Transactions whose replacements are confirmed are not included. But if none of the replacements are confirmed, all replacements are returned.
330    ///
331    /// # Parameters
332    ///
333    /// # Returns
334    ///
335    /// A vector of unconfirmed fee payer transaction details, including:
336    ///
337    /// - [`u32`]: Id of the fee payer transaction.
338    /// - [`u32`]: Id of the bumped transaction.
339    /// - [`Txid`]: Txid of the fee payer transaction.
340    /// - [`u32`]: Output index of the UTXO.
341    /// - [`Amount`]: Amount in satoshis.
342    pub async fn get_all_unconfirmed_fee_payer_txs(
343        &self,
344        tx: Option<DatabaseTransaction<'_>>,
345    ) -> Result<Vec<(u32, u32, Txid, u32, Amount, Option<u32>)>, BridgeError> {
346        let query = sqlx::query_as::<_, (i32, i32, TxidDB, i32, i64, Option<i32>)>(
347            "
348            SELECT fpu.id, fpu.bumped_id, fpu.fee_payer_txid, fpu.vout, fpu.amount, fpu.replacement_of_id
349            FROM tx_sender_fee_payer_utxos fpu
350            WHERE fpu.seen_block_id IS NULL
351              AND fpu.is_evicted = false
352              AND NOT EXISTS (
353                  SELECT 1
354                  FROM tx_sender_fee_payer_utxos x
355                  WHERE (x.replacement_of_id = fpu.replacement_of_id OR x.id = fpu.replacement_of_id)
356                    AND x.seen_block_id IS NOT NULL
357              )
358            ",
359        );
360
361        let results: Vec<(i32, i32, TxidDB, i32, i64, Option<i32>)> =
362            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
363
364        results
365            .iter()
366            .map(
367                |(id, bumped_id, fee_payer_txid, vout, amount, replacement_of_id)| {
368                    Ok((
369                        u32::try_from(*id).wrap_err("Failed to convert id to u32")?,
370                        u32::try_from(*bumped_id).wrap_err("Failed to convert bumped id to u32")?,
371                        fee_payer_txid.0,
372                        u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
373                        Amount::from_sat(
374                            u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
375                        ),
376                        replacement_of_id
377                            .map(u32::try_from)
378                            .transpose()
379                            .wrap_err("Failed to convert replacement of id to u32")?,
380                    ))
381                },
382            )
383            .collect::<Result<Vec<_>, BridgeError>>()
384    }
385
386    /// Returns all unconfirmed fee payer transactions for a try-to-send tx.
387    /// Transactions whose replacements are confirmed are not included. But if none of the replacements are confirmed, all replacements are returned.
388    ///
389    /// # Parameters
390    ///
391    /// - `bumped_id`: The id of the bumped transaction
392    ///
393    /// # Returns
394    ///
395    /// A vector of unconfirmed fee payer transaction details, including:
396    ///
397    /// - [`u32`]: Id of the fee payer transaction.
398    /// - [`Txid`]: Txid of the fee payer transaction.
399    /// - [`u32`]: Output index of the UTXO.
400    /// - [`Amount`]: Amount in satoshis.
401    pub async fn get_unconfirmed_fee_payer_txs(
402        &self,
403        tx: Option<DatabaseTransaction<'_>>,
404        bumped_id: u32,
405    ) -> Result<Vec<(u32, Txid, u32, Amount)>, BridgeError> {
406        let query = sqlx::query_as::<_, (i32, TxidDB, i32, i64)>(
407            "
408            SELECT fpu.id, fpu.fee_payer_txid, fpu.vout, fpu.amount
409            FROM tx_sender_fee_payer_utxos fpu
410            WHERE fpu.bumped_id = $1
411              AND fpu.seen_block_id IS NULL
412              AND fpu.is_evicted = false
413              AND NOT EXISTS (
414                  SELECT 1
415                  FROM tx_sender_fee_payer_utxos x
416                  WHERE (x.replacement_of_id = fpu.replacement_of_id OR x.id = fpu.replacement_of_id)
417                    AND x.seen_block_id IS NOT NULL
418              )
419            ",
420        )
421        .bind(i32::try_from(bumped_id).wrap_err("Failed to convert bumped id to i32")?);
422        let results: Vec<(i32, TxidDB, i32, i64)> =
423            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
424
425        results
426            .iter()
427            .map(|(id, fee_payer_txid, vout, amount)| {
428                Ok((
429                    u32::try_from(*id).wrap_err("Failed to convert id to u32")?,
430                    fee_payer_txid.0,
431                    u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
432                    Amount::from_sat(
433                        u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
434                    ),
435                ))
436            })
437            .collect::<Result<Vec<_>, BridgeError>>()
438    }
439
440    /// Marks a fee payer utxo and all its replacements as evicted.
441    /// If it is marked as evicted, it will not be tried to be bumped again. (Because wallet can use same utxos for other txs)
442    pub async fn mark_fee_payer_utxo_as_evicted(
443        &self,
444        tx: Option<DatabaseTransaction<'_>>,
445        id: u32,
446    ) -> Result<(), BridgeError> {
447        let query = sqlx::query(
448            "UPDATE tx_sender_fee_payer_utxos 
449                SET is_evicted = true 
450                WHERE id = $1 
451                OR replacement_of_id = $1",
452        )
453        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
454
455        execute_query_with_tx!(self.connection, tx, query, execute)?;
456        Ok(())
457    }
458
459    pub async fn get_confirmed_fee_payer_utxos(
460        &self,
461        tx: Option<DatabaseTransaction<'_>>,
462        id: u32,
463    ) -> Result<Vec<(Txid, u32, Amount)>, BridgeError> {
464        let query = sqlx::query_as::<_, (TxidDB, i32, i64)>(
465            "SELECT fee_payer_txid, vout, amount
466             FROM tx_sender_fee_payer_utxos fpu
467             WHERE fpu.bumped_id = $1 AND fpu.seen_block_id IS NOT NULL",
468        )
469        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
470
471        let results: Vec<(TxidDB, i32, i64)> =
472            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
473
474        results
475            .iter()
476            .map(|(fee_payer_txid, vout, amount)| {
477                Ok((
478                    fee_payer_txid.0,
479                    u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
480                    Amount::from_sat(
481                        u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
482                    ),
483                ))
484            })
485            .collect::<Result<Vec<_>, BridgeError>>()
486    }
487
488    /// Returns the id of the tx in `tx_sender_try_to_send_txs` if it exists.
489    /// Used to avoid adding duplicate transactions to the txsender.
490    pub async fn check_if_tx_exists_on_txsender(
491        &self,
492        tx: Option<DatabaseTransaction<'_>>,
493        txid: Txid,
494    ) -> Result<Option<u32>, BridgeError> {
495        let query = sqlx::query_as::<_, (i32,)>(
496            "SELECT id FROM tx_sender_try_to_send_txs WHERE txid = $1 LIMIT 1",
497        )
498        .bind(TxidDB(txid));
499
500        let result: Option<(i32,)> =
501            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
502        Ok(match result {
503            Some((id,)) => Some(u32::try_from(id).wrap_err("Failed to convert id to u32")?),
504            None => None,
505        })
506    }
507
508    pub async fn save_tx(
509        &self,
510        tx: Option<DatabaseTransaction<'_>>,
511        tx_metadata: Option<TxMetadata>,
512        raw_tx: &Transaction,
513        fee_paying_type: FeePayingType,
514        txid: Txid,
515        rbf_signing_info: Option<RbfSigningInfo>,
516    ) -> Result<u32, BridgeError> {
517        let query = sqlx::query_scalar(
518            "INSERT INTO tx_sender_try_to_send_txs (raw_tx, fee_paying_type, tx_metadata, txid, rbf_signing_info) VALUES ($1, $2::fee_paying_type, $3, $4, $5) RETURNING id"
519        )
520        .bind(serialize(raw_tx))
521        .bind(fee_paying_type)
522        .bind(serde_json::to_string(&tx_metadata).wrap_err("Failed to encode tx_metadata to JSON")?)
523        .bind(TxidDB(txid))
524        .bind(serde_json::to_string(&rbf_signing_info).wrap_err("Failed to encode tx_metadata to JSON")?);
525
526        let id: i32 = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
527        u32::try_from(id)
528            .wrap_err("Failed to convert id to u32")
529            .map_err(Into::into)
530    }
531
532    pub async fn save_rbf_txid(
533        &self,
534        tx: Option<DatabaseTransaction<'_>>,
535        id: u32,
536        txid: Txid,
537    ) -> Result<(), BridgeError> {
538        let query = sqlx::query("INSERT INTO tx_sender_rbf_txids (id, txid) VALUES ($1, $2)")
539            .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?)
540            .bind(TxidDB(txid));
541
542        execute_query_with_tx!(self.connection, tx, query, execute)?;
543        Ok(())
544    }
545
546    pub async fn get_last_rbf_txid(
547        &self,
548        tx: Option<DatabaseTransaction<'_>>,
549        id: u32,
550    ) -> Result<Option<Txid>, BridgeError> {
551        let query = sqlx::query_as::<_, (TxidDB,)>("SELECT txid FROM tx_sender_rbf_txids WHERE id = $1 ORDER BY insertion_order DESC LIMIT 1")
552            .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
553
554        let result: Option<(TxidDB,)> =
555            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
556        Ok(result.map(|(txid,)| txid.0))
557    }
558
559    pub async fn save_cancelled_outpoint(
560        &self,
561        tx: Option<DatabaseTransaction<'_>>,
562        cancelled_id: u32,
563        outpoint: bitcoin::OutPoint,
564    ) -> Result<(), BridgeError> {
565        let query = sqlx::query(
566            "INSERT INTO tx_sender_cancel_try_to_send_outpoints (cancelled_id, txid, vout) VALUES ($1, $2, $3)"
567        )
568        .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled id to i32")?)
569        .bind(TxidDB(outpoint.txid))
570        .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?);
571
572        execute_query_with_tx!(self.connection, tx, query, execute)?;
573        Ok(())
574    }
575
576    pub async fn save_cancelled_txid(
577        &self,
578        tx: Option<DatabaseTransaction<'_>>,
579        cancelled_id: u32,
580        txid: bitcoin::Txid,
581    ) -> Result<(), BridgeError> {
582        let query = sqlx::query(
583            "INSERT INTO tx_sender_cancel_try_to_send_txids (cancelled_id, txid) VALUES ($1, $2)",
584        )
585        .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled id to i32")?)
586        .bind(TxidDB(txid));
587
588        execute_query_with_tx!(self.connection, tx, query, execute)?;
589        Ok(())
590    }
591
592    pub async fn save_activated_txid(
593        &self,
594        tx: Option<DatabaseTransaction<'_>>,
595        activated_id: u32,
596        prerequisite_tx: &ActivatedWithTxid,
597    ) -> Result<(), BridgeError> {
598        let query = sqlx::query(
599            "INSERT INTO tx_sender_activate_try_to_send_txids (activated_id, txid, timelock) VALUES ($1, $2, $3)"
600        )
601        .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated id to i32")?)
602        .bind(TxidDB(prerequisite_tx.txid))
603        .bind(i32::try_from(prerequisite_tx.relative_block_height).wrap_err("Failed to convert relative block height to i32")?);
604
605        execute_query_with_tx!(self.connection, tx, query, execute)?;
606        Ok(())
607    }
608
609    pub async fn save_activated_outpoint(
610        &self,
611        tx: Option<DatabaseTransaction<'_>>,
612        activated_id: u32,
613        activated_outpoint: &ActivatedWithOutpoint,
614    ) -> Result<(), BridgeError> {
615        let query = sqlx::query(
616            "INSERT INTO tx_sender_activate_try_to_send_outpoints (activated_id, txid, vout, timelock) VALUES ($1, $2, $3, $4)"
617        )
618        .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated id to i32")?)
619        .bind(TxidDB(activated_outpoint.outpoint.txid))
620        .bind(i32::try_from(activated_outpoint.outpoint.vout).wrap_err("Failed to convert vout to i32")?)
621        .bind(i32::try_from(activated_outpoint.relative_block_height).wrap_err("Failed to convert relative block height to i32")?);
622
623        execute_query_with_tx!(self.connection, tx, query, execute)?;
624        Ok(())
625    }
626
627    /// Returns unconfirmed try-to-send transactions that satisfy all activation
628    /// conditions for sending:
629    ///
630    /// - Not in the non-active list
631    /// - Not in the cancelled list
632    /// - Transaction itself is not already confirmed
633    /// - Transaction and UTXO timelocks must be passed
634    /// - Fee rate is lower than the provided maximum fee rate (previous sends had a lower fee rate) or null (transaction wasn't sent before) OR the transaction was sent before, but the chain height increased since then, and the transaction is still not confirmed (accomplished by calling this fn with u32::MAX fee rate)
635    ///
636    /// # Parameters
637    ///
638    /// - `tx`: Optional database transaction
639    /// - `fee_rate`: Current fee rate of bitcoin or u32::MAX to retrieve all active txs
640    /// - `current_tip_height`: The current tip height of the Bitcoin blockchain
641    ///   for checking timelocks
642    ///
643    /// # Returns
644    ///
645    /// - [`Vec<u32>`]: A vector of transaction ids (db id) that are sendable.
646    pub async fn get_sendable_txs(
647        &self,
648        tx: Option<DatabaseTransaction<'_>>,
649        fee_rate: FeeRate,
650        current_tip_height: u32,
651    ) -> Result<Vec<u32>, BridgeError> {
652        let select_query = sqlx::query_as::<_, (i32,)>(
653            "WITH
654                -- Find non-active transactions (not seen or timelock not passed)
655                non_active_txs AS (
656                    -- Transactions with txid activations that aren't active yet
657                    SELECT DISTINCT
658                        activate_txid.activated_id AS tx_id
659                    FROM
660                        tx_sender_activate_try_to_send_txids AS activate_txid
661                    LEFT JOIN
662                        bitcoin_syncer AS syncer ON activate_txid.seen_block_id = syncer.id
663                    WHERE
664                        activate_txid.seen_block_id IS NULL
665                        OR (syncer.height + activate_txid.timelock > $2)
666
667                    UNION
668
669                    -- Transactions with outpoint activations that aren't active yet (not seen or timelock not passed)
670                    SELECT DISTINCT
671                        activate_outpoint.activated_id AS tx_id
672                    FROM
673                        tx_sender_activate_try_to_send_outpoints AS activate_outpoint
674                    LEFT JOIN
675                        bitcoin_syncer AS syncer ON activate_outpoint.seen_block_id = syncer.id
676                    WHERE
677                        activate_outpoint.seen_block_id IS NULL
678                        OR (syncer.height + activate_outpoint.timelock > $2)
679                ),
680
681                -- Transactions with cancelled conditions
682                cancelled_txs AS (
683                    -- Transactions with cancelled outpoints (not seen)
684                    SELECT DISTINCT
685                        cancelled_id AS tx_id
686                    FROM
687                        tx_sender_cancel_try_to_send_outpoints
688                    WHERE
689                        seen_block_id IS NOT NULL
690
691                    UNION
692
693                    -- Transactions with cancelled txids (not seen)
694                    SELECT DISTINCT
695                        cancelled_id AS tx_id
696                    FROM
697                        tx_sender_cancel_try_to_send_txids
698                    WHERE
699                        seen_block_id IS NOT NULL
700                )
701
702                -- Final query to get sendable transactions
703                SELECT
704                    txs.id
705                FROM
706                    tx_sender_try_to_send_txs AS txs
707                WHERE
708                    -- Transaction must not be in the non-active list
709                    txs.id NOT IN (SELECT tx_id FROM non_active_txs)
710                    -- Transaction must not be in the cancelled list
711                    AND txs.id NOT IN (SELECT tx_id FROM cancelled_txs)
712                    -- Transaction must not be already confirmed
713                    AND txs.seen_block_id IS NULL
714                    -- Check if fee_rate is lower than the provided fee rate or null
715                    AND (txs.effective_fee_rate IS NULL OR txs.effective_fee_rate < $1);",
716        )
717        .bind(
718            i64::try_from(fee_rate.to_sat_per_vb_ceil())
719                .wrap_err("Failed to convert fee rate to i64")?,
720        )
721        .bind(
722            i32::try_from(current_tip_height)
723                .wrap_err("Failed to convert current tip height to i32")?,
724        );
725
726        let results = execute_query_with_tx!(self.connection, tx, select_query, fetch_all)?;
727
728        let txs = results
729            .into_iter()
730            .map(|(id,)| u32::try_from(id))
731            .collect::<Result<Vec<_>, _>>()
732            .wrap_err("Failed to convert id to u32")?;
733
734        Ok(txs)
735    }
736
737    pub async fn update_effective_fee_rate(
738        &self,
739        tx: Option<DatabaseTransaction<'_>>,
740        id: u32,
741        effective_fee_rate: FeeRate,
742    ) -> Result<(), BridgeError> {
743        let query = sqlx::query(
744            "UPDATE tx_sender_try_to_send_txs SET effective_fee_rate = $1 WHERE id = $2",
745        )
746        .bind(
747            i64::try_from(effective_fee_rate.to_sat_per_vb_ceil())
748                .wrap_err("Failed to convert effective fee rate to i64")?,
749        )
750        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
751
752        execute_query_with_tx!(self.connection, tx, query, execute)?;
753
754        Ok(())
755    }
756
757    pub async fn get_try_to_send_tx(
758        &self,
759        tx: Option<DatabaseTransaction<'_>>,
760        id: u32,
761    ) -> Result<
762        (
763            Option<TxMetadata>,
764            Transaction,
765            FeePayingType,
766            Option<u32>,
767            Option<RbfSigningInfo>,
768        ),
769        BridgeError,
770    > {
771        let query = sqlx::query_as::<
772            _,
773            (
774                Option<String>,
775                Option<Vec<u8>>,
776                FeePayingType,
777                Option<i32>,
778                Option<String>,
779            ),
780        >(
781            "SELECT tx_metadata, raw_tx, fee_paying_type, seen_block_id, rbf_signing_info
782             FROM tx_sender_try_to_send_txs
783             WHERE id = $1 LIMIT 1",
784        )
785        .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
786
787        let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
788        Ok((
789            serde_json::from_str(result.0.as_deref().unwrap_or("null"))
790                .wrap_err_with(|| format!("Failed to decode tx_metadata from {:?}", result.0))?,
791            result
792                .1
793                .as_deref()
794                .map(deserialize)
795                .ok_or_eyre("Expected raw_tx to be present")?
796                .wrap_err("Failed to deserialize raw_tx")?,
797            result.2,
798            result
799                .3
800                .map(u32::try_from)
801                .transpose()
802                .wrap_err("Failed to convert seen_block_id to u32")?,
803            serde_json::from_str(result.4.as_deref().unwrap_or("null")).wrap_err_with(|| {
804                format!("Failed to decode rbf_signing_info from {:?}", result.4)
805            })?,
806        ))
807    }
808
809    // Debug Functions
810
811    /// Saves a TX submission error to the debug table
812    pub async fn save_tx_debug_submission_error(
813        &self,
814        tx: Option<DatabaseTransaction<'_>>,
815        tx_id: u32,
816        error_message: &str,
817    ) -> Result<(), BridgeError> {
818        let query = sqlx::query(
819            "INSERT INTO tx_sender_debug_submission_errors (tx_id, error_message) VALUES ($1, $2)",
820        )
821        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?)
822        .bind(error_message);
823
824        execute_query_with_tx!(self.connection, tx, query, execute)?;
825        Ok(())
826    }
827
828    /// Updates or inserts the TX's sending state in the debug table
829    ///
830    /// Does not support a Transaction because it's for debugging purposes. Make
831    /// sure that tx_id exists (i.e. creation is committed) before use
832    pub async fn update_tx_debug_sending_state(
833        &self,
834        tx_id: u32,
835        state: &str,
836        activated: bool,
837    ) -> Result<(), BridgeError> {
838        let query = sqlx::query(
839            r#"
840            INSERT INTO tx_sender_debug_sending_state
841            (tx_id, state, last_update, activated_timestamp)
842            VALUES ($1, $2, NOW(),
843                CASE
844                    WHEN $3 = TRUE THEN NOW()
845                    ELSE NULL
846                END
847            )
848            ON CONFLICT (tx_id) DO UPDATE SET
849            state = $2,
850            last_update = NOW(),
851            activated_timestamp = COALESCE(tx_sender_debug_sending_state.activated_timestamp,
852                CASE
853                    WHEN $3 = TRUE THEN NOW()
854                    ELSE NULL
855                END
856            )
857            "#,
858        )
859        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?)
860        .bind(state)
861        .bind(activated);
862
863        self.connection.execute(query).await?;
864        Ok(())
865    }
866
867    /// Gets the current debug state of a TX
868    pub async fn get_tx_debug_info(
869        &self,
870        tx: Option<DatabaseTransaction<'_>>,
871        tx_id: u32,
872    ) -> Result<Option<String>, BridgeError> {
873        let query = sqlx::query_as::<_, (Option<String>,)>(
874            r#"
875            SELECT state
876            FROM tx_sender_debug_sending_state
877            WHERE tx_id = $1
878            "#,
879        )
880        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
881
882        let result = execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
883        match result {
884            Some((state,)) => Ok(state),
885            None => Ok(None),
886        }
887    }
888
889    /// Gets all TX submission errors
890    pub async fn get_tx_debug_submission_errors(
891        &self,
892        tx: Option<DatabaseTransaction<'_>>,
893        tx_id: u32,
894    ) -> Result<Vec<(String, String)>, BridgeError> {
895        let query = sqlx::query_as::<_, (String, String)>(
896            r#"
897            SELECT error_message, timestamp::TEXT
898            FROM tx_sender_debug_submission_errors
899            WHERE tx_id = $1
900            ORDER BY timestamp ASC
901            "#,
902        )
903        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
904
905        execute_query_with_tx!(self.connection, tx, query, fetch_all).map_err(Into::into)
906    }
907
908    /// Gets all fee payer UTXOs for a TX with their confirmation status
909    pub async fn get_tx_debug_fee_payer_utxos(
910        &self,
911        tx: Option<DatabaseTransaction<'_>>,
912        tx_id: u32,
913    ) -> Result<Vec<(Txid, u32, Amount, bool)>, BridgeError> {
914        let query = sqlx::query_as::<_, (TxidDB, i32, i64, bool)>(
915            r#"
916            SELECT fee_payer_txid, vout, amount, seen_block_id IS NOT NULL as confirmed
917            FROM tx_sender_fee_payer_utxos
918            WHERE bumped_id = $1
919            "#,
920        )
921        .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
922
923        let results: Vec<(TxidDB, i32, i64, bool)> =
924            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
925
926        results
927            .iter()
928            .map(|(fee_payer_txid, vout, amount, confirmed)| {
929                Ok((
930                    fee_payer_txid.0,
931                    u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
932                    Amount::from_sat(
933                        u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
934                    ),
935                    *confirmed,
936                ))
937            })
938            .collect::<Result<Vec<_>, BridgeError>>()
939    }
940}
941
942#[async_trait]
943impl clementine_tx_sender::TxSenderDatabase for Database {
944    type Transaction = sqlx::Transaction<'static, sqlx::Postgres>;
945    async fn begin_transaction(
946        &self,
947    ) -> Result<sqlx::Transaction<'static, sqlx::Postgres>, BridgeError> {
948        self.begin_transaction().await
949    }
950
951    async fn commit_transaction(
952        &self,
953        tx: sqlx::Transaction<'static, sqlx::Postgres>,
954    ) -> Result<(), BridgeError> {
955        tx.commit().await.map_err(Into::into)
956    }
957
958    async fn save_tx_debug_submission_error(
959        &self,
960        dbtx: Option<&mut Self::Transaction>,
961        tx_id: u32,
962        error_message: &str,
963    ) -> Result<(), BridgeError> {
964        self.save_tx_debug_submission_error(dbtx, tx_id, error_message)
965            .await
966    }
967
968    async fn get_sendable_txs(
969        &self,
970        fee_rate: FeeRate,
971        current_tip_height: u32,
972    ) -> Result<Vec<u32>, BridgeError> {
973        self.get_sendable_txs(None, fee_rate, current_tip_height)
974            .await
975    }
976
977    async fn get_try_to_send_tx(
978        &self,
979        tx: Option<&mut Self::Transaction>,
980        id: u32,
981    ) -> Result<
982        (
983            Option<TxMetadata>,
984            Transaction,
985            FeePayingType,
986            Option<u32>,
987            Option<RbfSigningInfo>,
988        ),
989        BridgeError,
990    > {
991        self.get_try_to_send_tx(tx, id).await
992    }
993
994    async fn update_tx_debug_sending_state(
995        &self,
996        tx_id: u32,
997        state: &str,
998        activated: bool,
999    ) -> Result<(), BridgeError> {
1000        self.update_tx_debug_sending_state(tx_id, state, activated)
1001            .await
1002    }
1003
1004    async fn get_all_unconfirmed_fee_payer_txs(
1005        &self,
1006        tx: Option<&mut Self::Transaction>,
1007    ) -> Result<Vec<(u32, u32, Txid, u32, Amount, Option<u32>)>, BridgeError> {
1008        self.get_all_unconfirmed_fee_payer_txs(tx).await
1009    }
1010
1011    async fn get_unconfirmed_fee_payer_txs(
1012        &self,
1013        tx: Option<&mut Self::Transaction>,
1014        bumped_id: u32,
1015    ) -> Result<Vec<(u32, Txid, u32, Amount)>, BridgeError> {
1016        self.get_unconfirmed_fee_payer_txs(tx, bumped_id).await
1017    }
1018
1019    async fn mark_fee_payer_utxo_as_evicted(
1020        &self,
1021        tx: Option<&mut Self::Transaction>,
1022        id: u32,
1023    ) -> Result<(), BridgeError> {
1024        self.mark_fee_payer_utxo_as_evicted(tx, id).await
1025    }
1026
1027    async fn get_confirmed_fee_payer_utxos(
1028        &self,
1029        tx: Option<&mut Self::Transaction>,
1030        id: u32,
1031    ) -> Result<Vec<(Txid, u32, Amount)>, BridgeError> {
1032        self.get_confirmed_fee_payer_utxos(tx, id).await
1033    }
1034
1035    async fn save_fee_payer_tx(
1036        &self,
1037        tx: Option<&mut Self::Transaction>,
1038        _try_to_send_id: Option<u32>,
1039        bumped_id: u32,
1040        fee_payer_txid: Txid,
1041        vout: u32,
1042        amount: Amount,
1043        replacement_of_id: Option<u32>,
1044    ) -> Result<(), BridgeError> {
1045        self.save_fee_payer_tx(
1046            tx,
1047            bumped_id,
1048            fee_payer_txid,
1049            vout,
1050            amount,
1051            replacement_of_id,
1052        )
1053        .await
1054    }
1055
1056    async fn get_last_rbf_txid(
1057        &self,
1058        tx: Option<&mut Self::Transaction>,
1059        id: u32,
1060    ) -> Result<Option<Txid>, BridgeError> {
1061        self.get_last_rbf_txid(tx, id).await
1062    }
1063
1064    async fn save_rbf_txid(
1065        &self,
1066        tx: Option<&mut Self::Transaction>,
1067        id: u32,
1068        txid: Txid,
1069    ) -> Result<(), BridgeError> {
1070        self.save_rbf_txid(tx, id, txid).await
1071    }
1072
1073    async fn save_cancelled_outpoint(
1074        &self,
1075        tx: Option<&mut Self::Transaction>,
1076        cancelled_id: u32,
1077        outpoint: bitcoin::OutPoint,
1078    ) -> Result<(), BridgeError> {
1079        self.save_cancelled_outpoint(tx, cancelled_id, outpoint)
1080            .await
1081    }
1082
1083    async fn save_cancelled_txid(
1084        &self,
1085        tx: Option<&mut Self::Transaction>,
1086        cancelled_id: u32,
1087        txid: bitcoin::Txid,
1088    ) -> Result<(), BridgeError> {
1089        self.save_cancelled_txid(tx, cancelled_id, txid).await
1090    }
1091
1092    async fn save_activated_txid(
1093        &self,
1094        tx: Option<&mut Self::Transaction>,
1095        activated_id: u32,
1096        prerequisite_tx: &ActivatedWithTxid,
1097    ) -> Result<(), BridgeError> {
1098        self.save_activated_txid(tx, activated_id, prerequisite_tx)
1099            .await
1100    }
1101
1102    async fn save_activated_outpoint(
1103        &self,
1104        tx: Option<&mut Self::Transaction>,
1105        activated_id: u32,
1106        activated_outpoint: &ActivatedWithOutpoint,
1107    ) -> Result<(), BridgeError> {
1108        self.save_activated_outpoint(tx, activated_id, activated_outpoint)
1109            .await
1110    }
1111
1112    async fn update_effective_fee_rate(
1113        &self,
1114        tx: Option<&mut Self::Transaction>,
1115        id: u32,
1116        effective_fee_rate: FeeRate,
1117    ) -> Result<(), BridgeError> {
1118        self.update_effective_fee_rate(tx, id, effective_fee_rate)
1119            .await
1120    }
1121
1122    async fn check_if_tx_exists_on_txsender(
1123        &self,
1124        tx: Option<&mut Self::Transaction>,
1125        txid: Txid,
1126    ) -> Result<Option<u32>, BridgeError> {
1127        self.check_if_tx_exists_on_txsender(tx, txid).await
1128    }
1129
1130    async fn save_tx(
1131        &self,
1132        tx: Option<&mut Self::Transaction>,
1133        tx_metadata: Option<TxMetadata>,
1134        raw_tx: &Transaction,
1135        fee_paying_type: FeePayingType,
1136        txid: Txid,
1137        rbf_signing_info: Option<RbfSigningInfo>,
1138    ) -> Result<u32, BridgeError> {
1139        self.save_tx(
1140            tx,
1141            tx_metadata,
1142            raw_tx,
1143            fee_paying_type,
1144            txid,
1145            rbf_signing_info,
1146        )
1147        .await
1148    }
1149
1150    async fn get_tx_debug_info(
1151        &self,
1152        tx: Option<&mut Self::Transaction>,
1153        tx_id: u32,
1154    ) -> Result<Option<String>, BridgeError> {
1155        self.get_tx_debug_info(tx, tx_id).await
1156    }
1157
1158    async fn get_tx_debug_submission_errors(
1159        &self,
1160        tx: Option<&mut Self::Transaction>,
1161        tx_id: u32,
1162    ) -> Result<Vec<(String, String)>, BridgeError> {
1163        self.get_tx_debug_submission_errors(tx, tx_id).await
1164    }
1165
1166    async fn get_tx_debug_fee_payer_utxos(
1167        &self,
1168        tx: Option<&mut Self::Transaction>,
1169        tx_id: u32,
1170    ) -> Result<Vec<(Txid, u32, Amount, bool)>, BridgeError> {
1171        self.get_tx_debug_fee_payer_utxos(tx, tx_id).await
1172    }
1173
1174    async fn fetch_next_bitcoin_syncer_evt(
1175        &self,
1176        tx: &mut Self::Transaction,
1177        consumer_handle: &str,
1178    ) -> Result<Option<clementine_primitives::BitcoinSyncerEvent>, BridgeError> {
1179        self.fetch_next_bitcoin_syncer_evt(tx, consumer_handle)
1180            .await
1181    }
1182
1183    async fn get_block_info_from_id(
1184        &self,
1185        tx: Option<&mut Self::Transaction>,
1186        block_id: u32,
1187    ) -> Result<Option<(bitcoin::BlockHash, u32)>, BridgeError> {
1188        self.get_block_info_from_id(tx, block_id).await
1189    }
1190
1191    async fn confirm_transactions(
1192        &self,
1193        tx: &mut Self::Transaction,
1194        block_id: u32,
1195    ) -> Result<(), BridgeError> {
1196        self.confirm_transactions(tx, block_id).await
1197    }
1198
1199    async fn unconfirm_transactions(
1200        &self,
1201        tx: &mut Self::Transaction,
1202        block_id: u32,
1203    ) -> Result<(), BridgeError> {
1204        self.unconfirm_transactions(tx, block_id).await
1205    }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210    use super::*;
1211    use crate::database::Database;
1212    use crate::test::common::*;
1213    use bitcoin::absolute::Height;
1214    use bitcoin::hashes::Hash;
1215    use bitcoin::transaction::Version;
1216    use bitcoin::{Block, OutPoint, TapNodeHash, Txid};
1217
1218    async fn setup_test_db() -> Database {
1219        let config = create_test_config_with_thread_name().await;
1220        Database::new(&config).await.unwrap()
1221    }
1222
1223    #[tokio::test]
1224    async fn test_save_and_get_tx() {
1225        let db = setup_test_db().await;
1226        let tx = Transaction {
1227            version: Version::TWO,
1228            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1229            input: vec![],
1230            output: vec![],
1231        };
1232
1233        // Test saving tx
1234        let txid = tx.compute_txid();
1235        let rbfinfo = Some(RbfSigningInfo {
1236            vout: 123,
1237            tweak_merkle_root: Some(TapNodeHash::all_zeros()),
1238            annex: None,
1239            additional_taproot_output_count: None,
1240        });
1241        let id = db
1242            .save_tx(None, None, &tx, FeePayingType::CPFP, txid, rbfinfo.clone())
1243            .await
1244            .unwrap();
1245
1246        // Test retrieving tx
1247        let (_, retrieved_tx, fee_paying_type, seen_block_id, rbf_signing_info) =
1248            db.get_try_to_send_tx(None, id).await.unwrap();
1249        assert_eq!(tx.version, retrieved_tx.version);
1250        assert_eq!(fee_paying_type, FeePayingType::CPFP);
1251        assert_eq!(seen_block_id, None);
1252        assert_eq!(rbf_signing_info, rbfinfo);
1253    }
1254
1255    #[tokio::test]
1256    async fn test_fee_payer_utxo_operations() {
1257        let db = setup_test_db().await;
1258        let mut dbtx = db.begin_transaction().await.unwrap();
1259
1260        // First create a transaction that will be bumped
1261        let tx = Transaction {
1262            version: Version::TWO,
1263            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1264            input: vec![],
1265            output: vec![],
1266        };
1267
1268        // Save the transaction first
1269        let tx_id = db
1270            .save_tx(
1271                Some(&mut dbtx),
1272                None,
1273                &tx,
1274                FeePayingType::CPFP,
1275                Txid::all_zeros(),
1276                None,
1277            )
1278            .await
1279            .unwrap();
1280
1281        // Now we can use this tx_id as bumped_id
1282        let fee_payer_txid = Txid::hash(&[1u8; 32]);
1283        db.save_fee_payer_tx(
1284            Some(&mut dbtx),
1285            tx_id,
1286            fee_payer_txid,
1287            0,
1288            Amount::from_sat(50000),
1289            None,
1290        )
1291        .await
1292        .unwrap();
1293
1294        dbtx.commit().await.unwrap();
1295    }
1296
1297    #[tokio::test]
1298    async fn test_confirm_and_unconfirm_transactions() {
1299        const BLOCK_HEX: &str = "0200000035ab154183570282ce9afc0b494c9fc6a3cfea05aa8c1add2ecc56490000000038ba3d78e4500a5a7570dbe61960398add4410d278b21cd9708e6d9743f374d544fc055227f1001c29c1ea3b0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff3703a08601000427f1001c046a510100522cfabe6d6d0000000000000000000068692066726f6d20706f6f6c7365727665726aac1eeeed88ffffffff0100f2052a010000001976a914912e2b234f941f30b18afbb4fa46171214bf66c888ac00000000";
1300        let block: Block = deserialize(&hex::decode(BLOCK_HEX).unwrap()).unwrap();
1301
1302        let db = setup_test_db().await;
1303        let mut dbtx = db.begin_transaction().await.unwrap();
1304
1305        // Create a block to use for confirmation
1306        let block_id = crate::bitcoin_syncer::save_block(&db, &mut dbtx, &block, 100)
1307            .await
1308            .unwrap();
1309
1310        // Create a transaction
1311        let tx = Transaction {
1312            version: Version::TWO,
1313            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1314            input: vec![],
1315            output: vec![],
1316        };
1317        let tx_id = db
1318            .save_tx(
1319                Some(&mut dbtx),
1320                None,
1321                &tx,
1322                FeePayingType::CPFP,
1323                Txid::all_zeros(),
1324                None,
1325            )
1326            .await
1327            .unwrap();
1328
1329        // Save fee payer UTXO
1330        let fee_payer_txid = Txid::hash(&[1u8; 32]);
1331        db.save_fee_payer_tx(
1332            Some(&mut dbtx),
1333            tx_id,
1334            fee_payer_txid,
1335            0,
1336            Amount::from_sat(50000),
1337            None,
1338        )
1339        .await
1340        .unwrap();
1341
1342        // Save the transaction in the block
1343        db.insert_txid_to_block(&mut dbtx, block_id, &fee_payer_txid)
1344            .await
1345            .unwrap();
1346
1347        // Confirm transactions
1348        db.confirm_transactions(&mut dbtx, block_id).await.unwrap();
1349
1350        dbtx.commit().await.unwrap();
1351    }
1352
1353    #[tokio::test]
1354    async fn test_cancelled_outpoints_and_txids() {
1355        let db = setup_test_db().await;
1356        let mut dbtx = db.begin_transaction().await.unwrap();
1357
1358        // First create a transaction to cancel
1359        let tx = Transaction {
1360            version: Version::TWO,
1361            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1362            input: vec![],
1363            output: vec![],
1364        };
1365
1366        // Save the transaction first
1367        let tx_id = db
1368            .save_tx(
1369                Some(&mut dbtx),
1370                None,
1371                &tx,
1372                FeePayingType::CPFP,
1373                Txid::all_zeros(),
1374                None,
1375            )
1376            .await
1377            .unwrap();
1378
1379        // Now we can use this tx_id as cancelled_id
1380        let txid = Txid::hash(&[0u8; 32]);
1381        let vout = 0;
1382
1383        // Test cancelling by outpoint
1384        db.save_cancelled_outpoint(Some(&mut dbtx), tx_id, OutPoint { txid, vout })
1385            .await
1386            .unwrap();
1387
1388        // Test cancelling by txid
1389        db.save_cancelled_txid(Some(&mut dbtx), tx_id, txid)
1390            .await
1391            .unwrap();
1392
1393        dbtx.commit().await.unwrap();
1394    }
1395
1396    #[tokio::test]
1397    async fn test_get_sendable_txs() {
1398        let db = setup_test_db().await;
1399        let mut dbtx = db.begin_transaction().await.unwrap();
1400
1401        // Create and save test transactions
1402        let tx1 = Transaction {
1403            version: Version::TWO,
1404            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1405            input: vec![],
1406            output: vec![],
1407        };
1408        let tx2 = Transaction {
1409            version: Version::TWO,
1410            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1411            input: vec![],
1412            output: vec![],
1413        };
1414
1415        let id1 = db
1416            .save_tx(
1417                Some(&mut dbtx),
1418                None,
1419                &tx1,
1420                FeePayingType::CPFP,
1421                Txid::all_zeros(),
1422                None,
1423            )
1424            .await
1425            .unwrap();
1426        let id2 = db
1427            .save_tx(
1428                Some(&mut dbtx),
1429                None,
1430                &tx2,
1431                FeePayingType::RBF,
1432                Txid::all_zeros(),
1433                None,
1434            )
1435            .await
1436            .unwrap();
1437
1438        // Test getting sendable txs
1439        let fee_rate = FeeRate::from_sat_per_vb(3).unwrap();
1440        let current_tip_height = 100;
1441
1442        let sendable_txs = db
1443            .get_sendable_txs(Some(&mut dbtx), fee_rate, current_tip_height)
1444            .await
1445            .unwrap();
1446
1447        // Both transactions should be sendable as they have no prerequisites or cancellations
1448        assert_eq!(sendable_txs.len(), 2);
1449        assert!(sendable_txs.contains(&id1));
1450        assert!(sendable_txs.contains(&id2));
1451
1452        // Test updating effective fee rate for tx1 with a fee rate equal to the query fee rate
1453        // This should  make tx1 not sendable since the condition is "effective_fee_rate < fee_rate"
1454        db.update_effective_fee_rate(Some(&mut dbtx), id1, fee_rate)
1455            .await
1456            .unwrap();
1457
1458        let sendable_txs = db
1459            .get_sendable_txs(Some(&mut dbtx), fee_rate, current_tip_height)
1460            .await
1461            .unwrap();
1462        assert_eq!(sendable_txs.len(), 1);
1463        assert!(sendable_txs.contains(&id2));
1464
1465        // increase fee rate, all should be sendable again
1466        let sendable_txs = db
1467            .get_sendable_txs(
1468                Some(&mut dbtx),
1469                FeeRate::from_sat_per_vb(4).unwrap(),
1470                current_tip_height + 1,
1471            )
1472            .await
1473            .unwrap();
1474        assert_eq!(sendable_txs.len(), 2);
1475        assert!(sendable_txs.contains(&id1));
1476        assert!(sendable_txs.contains(&id2));
1477
1478        dbtx.commit().await.unwrap();
1479    }
1480
1481    #[tokio::test]
1482    async fn test_debug_sending_state() {
1483        let db = setup_test_db().await;
1484        let mut dbtx = db.begin_transaction().await.unwrap();
1485
1486        // Create a test transaction
1487        let tx = Transaction {
1488            version: Version::TWO,
1489            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1490            input: vec![],
1491            output: vec![],
1492        };
1493
1494        // Insert the transaction into the database
1495        let tx_id = db
1496            .save_tx(
1497                None, // needed so that tx_id is available
1498                None,
1499                &tx,
1500                FeePayingType::RBF,
1501                tx.compute_txid(),
1502                None,
1503            )
1504            .await
1505            .unwrap();
1506
1507        // Test updating the sending state
1508        let initial_state = "waiting_for_fee_payer_utxos";
1509        db.update_tx_debug_sending_state(tx_id, initial_state, false)
1510            .await
1511            .unwrap();
1512
1513        // Verify the state was saved correctly
1514        let state = db.get_tx_debug_info(Some(&mut dbtx), tx_id).await.unwrap();
1515        assert_eq!(state, Some(initial_state.to_string()));
1516
1517        // Update the state with activation
1518        let active_state = "ready_to_send";
1519        db.update_tx_debug_sending_state(tx_id, active_state, true)
1520            .await
1521            .unwrap();
1522
1523        // Verify the state was updated
1524        let state = db.get_tx_debug_info(Some(&mut dbtx), tx_id).await.unwrap();
1525        assert_eq!(state, Some(active_state.to_string()));
1526
1527        // Test saving an error message
1528        let error_message = "Failed to send transaction: insufficient fee";
1529        db.save_tx_debug_submission_error(Some(&mut dbtx), tx_id, error_message)
1530            .await
1531            .unwrap();
1532
1533        // Verify the error was saved
1534        let errors = db
1535            .get_tx_debug_submission_errors(Some(&mut dbtx), tx_id)
1536            .await
1537            .unwrap();
1538        assert_eq!(errors.len(), 1);
1539        assert_eq!(errors[0].0, error_message);
1540
1541        // Add another error
1542        let second_error = "Network connection timeout";
1543        db.save_tx_debug_submission_error(Some(&mut dbtx), tx_id, second_error)
1544            .await
1545            .unwrap();
1546
1547        // Verify both errors are retrieved in order
1548        let errors = db
1549            .get_tx_debug_submission_errors(Some(&mut dbtx), tx_id)
1550            .await
1551            .unwrap();
1552        assert_eq!(errors.len(), 2);
1553        assert_eq!(errors[0].0, error_message);
1554        assert_eq!(errors[1].0, second_error);
1555
1556        // Update state again
1557        let final_state = "sent";
1558        db.update_tx_debug_sending_state(tx_id, final_state, true)
1559            .await
1560            .unwrap();
1561
1562        // Verify final state
1563        let state = db.get_tx_debug_info(Some(&mut dbtx), tx_id).await.unwrap();
1564        assert_eq!(state, Some(final_state.to_string()));
1565
1566        dbtx.commit().await.unwrap();
1567    }
1568}