1use super::{wrapper::TxidDB, Database, DatabaseTransaction};
6use crate::{
7 errors::BridgeError,
8 execute_query_with_tx,
9 tx_sender::{ActivatedWithOutpoint, ActivatedWithTxid},
10 utils::{FeePayingType, RbfSigningInfo, TxMetadata},
11};
12use bitcoin::{
13 consensus::{deserialize, serialize},
14 Amount, FeeRate, Transaction, Txid,
15};
16use eyre::{Context, OptionExt};
17use sqlx::Executor;
18use std::ops::DerefMut;
19
20impl Database {
21 pub async fn confirm_transactions(
24 &self,
25 tx: DatabaseTransaction<'_, '_>,
26 block_id: u32,
27 ) -> Result<(), BridgeError> {
28 let block_id = i32::try_from(block_id).wrap_err("Failed to convert block id to i32")?;
29
30 let common_ctes = r#"
33 WITH relevant_txs AS (
34 SELECT txid
35 FROM bitcoin_syncer_txs
36 WHERE block_id = $1
37 ),
38 relevant_spent_utxos AS (
39 SELECT txid, vout
40 FROM bitcoin_syncer_spent_utxos
41 WHERE block_id = $1
42 ),
43 confirmed_rbf_ids AS (
44 SELECT rbf.id
45 FROM tx_sender_rbf_txids AS rbf
46 JOIN bitcoin_syncer_txs AS syncer ON rbf.txid = syncer.txid
47 WHERE syncer.block_id = $1
48 )
49 "#;
50
51 sqlx::query(&format!(
53 "{common_ctes}
54 UPDATE tx_sender_activate_try_to_send_txids AS tap
55 SET seen_block_id = $1
56 WHERE tap.txid IN (SELECT txid FROM relevant_txs)
57 AND tap.seen_block_id IS NULL"
58 ))
59 .bind(block_id)
60 .execute(tx.deref_mut())
61 .await?;
62
63 sqlx::query(&format!(
65 "{common_ctes}
66 UPDATE tx_sender_activate_try_to_send_outpoints AS tap
67 SET seen_block_id = $1
68 WHERE (tap.txid, tap.vout) IN (SELECT txid, vout FROM relevant_spent_utxos)
69 AND tap.seen_block_id IS NULL"
70 ))
71 .bind(block_id)
72 .execute(tx.deref_mut())
73 .await?;
74
75 sqlx::query(&format!(
77 "{common_ctes}
78 UPDATE tx_sender_cancel_try_to_send_txids AS ctt
79 SET seen_block_id = $1
80 WHERE ctt.txid IN (SELECT txid FROM relevant_txs)
81 AND ctt.seen_block_id IS NULL"
82 ))
83 .bind(block_id)
84 .execute(tx.deref_mut())
85 .await?;
86
87 sqlx::query(&format!(
89 "{common_ctes}
90 UPDATE tx_sender_cancel_try_to_send_outpoints AS cto
91 SET seen_block_id = $1
92 WHERE (cto.txid, cto.vout) IN (SELECT txid, vout FROM relevant_spent_utxos)
93 AND cto.seen_block_id IS NULL"
94 ))
95 .bind(block_id)
96 .execute(tx.deref_mut())
97 .await?;
98
99 sqlx::query(&format!(
101 "{common_ctes}
102 UPDATE tx_sender_fee_payer_utxos AS fpu
103 SET seen_block_id = $1
104 WHERE fpu.fee_payer_txid IN (SELECT txid FROM relevant_txs)
105 AND fpu.seen_block_id IS NULL"
106 ))
107 .bind(block_id)
108 .execute(tx.deref_mut())
109 .await?;
110
111 sqlx::query(&format!(
113 "{common_ctes}
114 UPDATE tx_sender_try_to_send_txs AS txs
115 SET seen_block_id = $1
116 WHERE txs.txid IN (SELECT txid FROM relevant_txs)
117 AND txs.seen_block_id IS NULL"
118 ))
119 .bind(block_id)
120 .execute(tx.deref_mut())
121 .await?;
122
123 sqlx::query(&format!(
125 "{common_ctes}
126 UPDATE tx_sender_try_to_send_txs AS txs
127 SET seen_block_id = $1
128 WHERE txs.id IN (SELECT id FROM confirmed_rbf_ids)
129 AND txs.seen_block_id IS NULL"
130 ))
131 .bind(block_id)
132 .execute(tx.deref_mut())
133 .await?;
134
135 let bg_db = self.clone();
136 tokio::spawn(async move {
138 let Ok(confirmed_direct_txs): Result<Vec<(i32, TxidDB)>, _> = sqlx::query_as(&format!(
140 "{common_ctes}
141 SELECT txs.id, txs.txid
142 FROM tx_sender_try_to_send_txs AS txs
143 WHERE txs.txid IN (SELECT txid FROM relevant_txs)",
144 ))
145 .bind(block_id)
146 .fetch_all(&bg_db.connection)
147 .await
148 else {
149 tracing::error!("Failed to update debug info for confirmed txs");
150 return;
151 };
152
153 let Ok(confirmed_rbf_txs): Result<Vec<(i32,)>, _> = sqlx::query_as(&format!(
155 "{common_ctes}
156 SELECT txs.id
157 FROM tx_sender_try_to_send_txs AS txs
158 WHERE txs.id IN (SELECT id FROM confirmed_rbf_ids)",
159 ))
160 .bind(block_id)
161 .fetch_all(&bg_db.connection)
162 .await
163 else {
164 tracing::error!("Failed to update debug info for confirmed txs");
165 return;
166 };
167
168 for (tx_id, txid) in confirmed_direct_txs {
170 tracing::debug!(try_to_send_id=?tx_id, "Transaction confirmed in block {}: direct confirmation of txid {}",
172 block_id, txid.0);
173
174 let _ = bg_db
176 .update_tx_debug_sending_state(tx_id as u32, "confirmed", true)
177 .await;
178 }
179
180 for (tx_id,) in confirmed_rbf_txs {
182 tracing::debug!(try_to_send_id=?tx_id, "Transaction confirmed in block {}: RBF confirmation",
184 block_id);
185
186 let _ = bg_db
188 .update_tx_debug_sending_state(tx_id as u32, "confirmed", true)
189 .await;
190 }
191 });
192
193 Ok(())
194 }
195
196 pub async fn unconfirm_transactions(
201 &self,
202 tx: DatabaseTransaction<'_, '_>,
203 block_id: u32,
204 ) -> Result<(), BridgeError> {
205 let block_id = i32::try_from(block_id).wrap_err("Failed to convert block id to i32")?;
206
207 let previously_confirmed_txs = sqlx::query_as::<_, (i32,)>(
210 "SELECT id FROM tx_sender_try_to_send_txs WHERE seen_block_id = $1",
211 )
212 .bind(block_id)
213 .fetch_all(tx.deref_mut())
214 .await;
215
216 let bg_db = self.clone();
217 tokio::spawn(async move {
218 let previously_confirmed_txs = match previously_confirmed_txs {
219 Ok(txs) => txs,
220 Err(e) => {
221 tracing::error!(error=?e, "Failed to get previously confirmed txs from database");
222 return;
223 }
224 };
225
226 for (tx_id,) in previously_confirmed_txs {
227 tracing::debug!(try_to_send_id=?tx_id, "Transaction unconfirmed in block {}: unconfirming", block_id);
228 let _ = bg_db
229 .update_tx_debug_sending_state(tx_id as u32, "unconfirmed", false)
230 .await;
231 }
232 });
233
234 sqlx::query(
237 "UPDATE tx_sender_activate_try_to_send_txids AS tap
238 SET seen_block_id = NULL
239 WHERE tap.seen_block_id = $1",
240 )
241 .bind(block_id)
242 .execute(tx.deref_mut())
243 .await?;
244
245 sqlx::query(
247 "UPDATE tx_sender_activate_try_to_send_outpoints AS tap
248 SET seen_block_id = NULL
249 WHERE tap.seen_block_id = $1",
250 )
251 .bind(block_id)
252 .execute(tx.deref_mut())
253 .await?;
254
255 sqlx::query(
257 "UPDATE tx_sender_cancel_try_to_send_txids AS ctt
258 SET seen_block_id = NULL
259 WHERE ctt.seen_block_id = $1",
260 )
261 .bind(block_id)
262 .execute(tx.deref_mut())
263 .await?;
264
265 sqlx::query(
267 "UPDATE tx_sender_cancel_try_to_send_outpoints AS cto
268 SET seen_block_id = NULL
269 WHERE cto.seen_block_id = $1",
270 )
271 .bind(block_id)
272 .execute(tx.deref_mut())
273 .await?;
274
275 sqlx::query(
277 "UPDATE tx_sender_fee_payer_utxos AS fpu
278 SET seen_block_id = NULL
279 WHERE fpu.seen_block_id = $1",
280 )
281 .bind(block_id)
282 .execute(tx.deref_mut())
283 .await?;
284
285 sqlx::query(
287 "UPDATE tx_sender_try_to_send_txs AS txs
288 SET seen_block_id = NULL
289 WHERE txs.seen_block_id = $1",
290 )
291 .bind(block_id)
292 .execute(tx.deref_mut())
293 .await?;
294
295 Ok(())
296 }
297
298 pub async fn save_fee_payer_tx(
307 &self,
308 tx: Option<DatabaseTransaction<'_, '_>>,
309 bumped_id: u32,
310 fee_payer_txid: Txid,
311 vout: u32,
312 amount: Amount,
313 replacement_of_id: Option<u32>,
314 ) -> Result<(), BridgeError> {
315 let query = sqlx::query(
316 "INSERT INTO tx_sender_fee_payer_utxos (bumped_id, fee_payer_txid, vout, amount, replacement_of_id)
317 VALUES ($1, $2, $3, $4, $5)",
318 )
319 .bind(i32::try_from(bumped_id).wrap_err("Failed to convert bumped id to i32")?)
320 .bind(TxidDB(fee_payer_txid))
321 .bind(i32::try_from(vout).wrap_err("Failed to convert vout to i32")?)
322 .bind(i64::try_from(amount.to_sat()).wrap_err("Failed to convert amount to i64")?)
323 .bind(replacement_of_id.map( i32::try_from).transpose().wrap_err("Failed to convert replacement of id to i32")?);
324
325 execute_query_with_tx!(self.connection, tx, query, execute)?;
326
327 Ok(())
328 }
329
330 pub async fn get_all_unconfirmed_fee_payer_txs(
345 &self,
346 tx: Option<DatabaseTransaction<'_, '_>>,
347 ) -> Result<Vec<(u32, u32, Txid, u32, Amount, Option<u32>)>, BridgeError> {
348 let query = sqlx::query_as::<_, (i32, i32, TxidDB, i32, i64, Option<i32>)>(
349 "
350 SELECT fpu.id, fpu.bumped_id, fpu.fee_payer_txid, fpu.vout, fpu.amount, fpu.replacement_of_id
351 FROM tx_sender_fee_payer_utxos fpu
352 WHERE fpu.seen_block_id IS NULL
353 AND fpu.is_evicted = false
354 AND NOT EXISTS (
355 SELECT 1
356 FROM tx_sender_fee_payer_utxos x
357 WHERE (x.replacement_of_id = fpu.replacement_of_id OR x.id = fpu.replacement_of_id)
358 AND x.seen_block_id IS NOT NULL
359 )
360 ",
361 );
362
363 let results: Vec<(i32, i32, TxidDB, i32, i64, Option<i32>)> =
364 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
365
366 results
367 .iter()
368 .map(
369 |(id, bumped_id, fee_payer_txid, vout, amount, replacement_of_id)| {
370 Ok((
371 u32::try_from(*id).wrap_err("Failed to convert id to u32")?,
372 u32::try_from(*bumped_id).wrap_err("Failed to convert bumped id to u32")?,
373 fee_payer_txid.0,
374 u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
375 Amount::from_sat(
376 u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
377 ),
378 replacement_of_id
379 .map(u32::try_from)
380 .transpose()
381 .wrap_err("Failed to convert replacement of id to u32")?,
382 ))
383 },
384 )
385 .collect::<Result<Vec<_>, BridgeError>>()
386 }
387
388 pub async fn get_unconfirmed_fee_payer_txs(
404 &self,
405 tx: Option<DatabaseTransaction<'_, '_>>,
406 bumped_id: u32,
407 ) -> Result<Vec<(u32, Txid, u32, Amount)>, BridgeError> {
408 let query = sqlx::query_as::<_, (i32, TxidDB, i32, i64)>(
409 "
410 SELECT fpu.id, fpu.fee_payer_txid, fpu.vout, fpu.amount
411 FROM tx_sender_fee_payer_utxos fpu
412 WHERE fpu.bumped_id = $1
413 AND fpu.seen_block_id IS NULL
414 AND fpu.is_evicted = false
415 AND NOT EXISTS (
416 SELECT 1
417 FROM tx_sender_fee_payer_utxos x
418 WHERE (x.replacement_of_id = fpu.replacement_of_id OR x.id = fpu.replacement_of_id)
419 AND x.seen_block_id IS NOT NULL
420 )
421 ",
422 )
423 .bind(i32::try_from(bumped_id).wrap_err("Failed to convert bumped id to i32")?);
424 let results: Vec<(i32, TxidDB, i32, i64)> =
425 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
426
427 results
428 .iter()
429 .map(|(id, fee_payer_txid, vout, amount)| {
430 Ok((
431 u32::try_from(*id).wrap_err("Failed to convert id to u32")?,
432 fee_payer_txid.0,
433 u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
434 Amount::from_sat(
435 u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
436 ),
437 ))
438 })
439 .collect::<Result<Vec<_>, BridgeError>>()
440 }
441
442 pub async fn mark_fee_payer_utxo_as_evicted(
445 &self,
446 tx: Option<DatabaseTransaction<'_, '_>>,
447 id: u32,
448 ) -> Result<(), BridgeError> {
449 let query = sqlx::query(
450 "UPDATE tx_sender_fee_payer_utxos
451 SET is_evicted = true
452 WHERE id = $1
453 OR replacement_of_id = $1",
454 )
455 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
456
457 execute_query_with_tx!(self.connection, tx, query, execute)?;
458 Ok(())
459 }
460
461 pub async fn get_confirmed_fee_payer_utxos(
462 &self,
463 tx: Option<DatabaseTransaction<'_, '_>>,
464 id: u32,
465 ) -> Result<Vec<(Txid, u32, Amount)>, BridgeError> {
466 let query = sqlx::query_as::<_, (TxidDB, i32, i64)>(
467 "SELECT fee_payer_txid, vout, amount
468 FROM tx_sender_fee_payer_utxos fpu
469 WHERE fpu.bumped_id = $1 AND fpu.seen_block_id IS NOT NULL",
470 )
471 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
472
473 let results: Vec<(TxidDB, i32, i64)> =
474 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
475
476 results
477 .iter()
478 .map(|(fee_payer_txid, vout, amount)| {
479 Ok((
480 fee_payer_txid.0,
481 u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
482 Amount::from_sat(
483 u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
484 ),
485 ))
486 })
487 .collect::<Result<Vec<_>, BridgeError>>()
488 }
489
490 pub async fn check_if_tx_exists_on_txsender(
493 &self,
494 tx: Option<DatabaseTransaction<'_, '_>>,
495 txid: Txid,
496 ) -> Result<Option<u32>, BridgeError> {
497 let query = sqlx::query_as::<_, (i32,)>(
498 "SELECT id FROM tx_sender_try_to_send_txs WHERE txid = $1 LIMIT 1",
499 )
500 .bind(TxidDB(txid));
501
502 let result: Option<(i32,)> =
503 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
504 Ok(match result {
505 Some((id,)) => Some(u32::try_from(id).wrap_err("Failed to convert id to u32")?),
506 None => None,
507 })
508 }
509
510 pub async fn save_tx(
511 &self,
512 tx: Option<DatabaseTransaction<'_, '_>>,
513 tx_metadata: Option<TxMetadata>,
514 raw_tx: &Transaction,
515 fee_paying_type: FeePayingType,
516 txid: Txid,
517 rbf_signing_info: Option<RbfSigningInfo>,
518 ) -> Result<u32, BridgeError> {
519 let query = sqlx::query_scalar(
520 "INSERT INTO tx_sender_try_to_send_txs (raw_tx, fee_paying_type, tx_metadata, txid, rbf_signing_info) VALUES ($1, $2::fee_paying_type, $3, $4, $5) RETURNING id"
521 )
522 .bind(serialize(raw_tx))
523 .bind(fee_paying_type)
524 .bind(serde_json::to_string(&tx_metadata).wrap_err("Failed to encode tx_metadata to JSON")?)
525 .bind(TxidDB(txid))
526 .bind(serde_json::to_string(&rbf_signing_info).wrap_err("Failed to encode tx_metadata to JSON")?);
527
528 let id: i32 = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
529 u32::try_from(id)
530 .wrap_err("Failed to convert id to u32")
531 .map_err(Into::into)
532 }
533
534 pub async fn save_rbf_txid(
535 &self,
536 tx: Option<DatabaseTransaction<'_, '_>>,
537 id: u32,
538 txid: Txid,
539 ) -> Result<(), BridgeError> {
540 let query = sqlx::query("INSERT INTO tx_sender_rbf_txids (id, txid) VALUES ($1, $2)")
541 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?)
542 .bind(TxidDB(txid));
543
544 execute_query_with_tx!(self.connection, tx, query, execute)?;
545 Ok(())
546 }
547
548 pub async fn get_last_rbf_txid(
549 &self,
550 tx: Option<DatabaseTransaction<'_, '_>>,
551 id: u32,
552 ) -> Result<Option<Txid>, BridgeError> {
553 let query = sqlx::query_as::<_, (TxidDB,)>("SELECT txid FROM tx_sender_rbf_txids WHERE id = $1 ORDER BY insertion_order DESC LIMIT 1")
554 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
555
556 let result: Option<(TxidDB,)> =
557 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
558 Ok(result.map(|(txid,)| txid.0))
559 }
560
561 pub async fn save_cancelled_outpoint(
562 &self,
563 tx: Option<DatabaseTransaction<'_, '_>>,
564 cancelled_id: u32,
565 outpoint: bitcoin::OutPoint,
566 ) -> Result<(), BridgeError> {
567 let query = sqlx::query(
568 "INSERT INTO tx_sender_cancel_try_to_send_outpoints (cancelled_id, txid, vout) VALUES ($1, $2, $3)"
569 )
570 .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled id to i32")?)
571 .bind(TxidDB(outpoint.txid))
572 .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?);
573
574 execute_query_with_tx!(self.connection, tx, query, execute)?;
575 Ok(())
576 }
577
578 pub async fn save_cancelled_txid(
579 &self,
580 tx: Option<DatabaseTransaction<'_, '_>>,
581 cancelled_id: u32,
582 txid: bitcoin::Txid,
583 ) -> Result<(), BridgeError> {
584 let query = sqlx::query(
585 "INSERT INTO tx_sender_cancel_try_to_send_txids (cancelled_id, txid) VALUES ($1, $2)",
586 )
587 .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled id to i32")?)
588 .bind(TxidDB(txid));
589
590 execute_query_with_tx!(self.connection, tx, query, execute)?;
591 Ok(())
592 }
593
594 pub async fn save_activated_txid(
595 &self,
596 tx: Option<DatabaseTransaction<'_, '_>>,
597 activated_id: u32,
598 prerequisite_tx: &ActivatedWithTxid,
599 ) -> Result<(), BridgeError> {
600 let query = sqlx::query(
601 "INSERT INTO tx_sender_activate_try_to_send_txids (activated_id, txid, timelock) VALUES ($1, $2, $3)"
602 )
603 .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated id to i32")?)
604 .bind(TxidDB(prerequisite_tx.txid))
605 .bind(i32::try_from(prerequisite_tx.relative_block_height).wrap_err("Failed to convert relative block height to i32")?);
606
607 execute_query_with_tx!(self.connection, tx, query, execute)?;
608 Ok(())
609 }
610
611 pub async fn save_activated_outpoint(
612 &self,
613 tx: Option<DatabaseTransaction<'_, '_>>,
614 activated_id: u32,
615 activated_outpoint: &ActivatedWithOutpoint,
616 ) -> Result<(), BridgeError> {
617 let query = sqlx::query(
618 "INSERT INTO tx_sender_activate_try_to_send_outpoints (activated_id, txid, vout, timelock) VALUES ($1, $2, $3, $4)"
619 )
620 .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated id to i32")?)
621 .bind(TxidDB(activated_outpoint.outpoint.txid))
622 .bind(i32::try_from(activated_outpoint.outpoint.vout).wrap_err("Failed to convert vout to i32")?)
623 .bind(i32::try_from(activated_outpoint.relative_block_height).wrap_err("Failed to convert relative block height to i32")?);
624
625 execute_query_with_tx!(self.connection, tx, query, execute)?;
626 Ok(())
627 }
628
629 pub async fn get_sendable_txs(
649 &self,
650 tx: Option<DatabaseTransaction<'_, '_>>,
651 fee_rate: FeeRate,
652 current_tip_height: u32,
653 ) -> Result<Vec<u32>, BridgeError> {
654 let select_query = sqlx::query_as::<_, (i32,)>(
655 "WITH
656 -- Find non-active transactions (not seen or timelock not passed)
657 non_active_txs AS (
658 -- Transactions with txid activations that aren't active yet
659 SELECT DISTINCT
660 activate_txid.activated_id AS tx_id
661 FROM
662 tx_sender_activate_try_to_send_txids AS activate_txid
663 LEFT JOIN
664 bitcoin_syncer AS syncer ON activate_txid.seen_block_id = syncer.id
665 WHERE
666 activate_txid.seen_block_id IS NULL
667 OR (syncer.height + activate_txid.timelock > $2)
668
669 UNION
670
671 -- Transactions with outpoint activations that aren't active yet (not seen or timelock not passed)
672 SELECT DISTINCT
673 activate_outpoint.activated_id AS tx_id
674 FROM
675 tx_sender_activate_try_to_send_outpoints AS activate_outpoint
676 LEFT JOIN
677 bitcoin_syncer AS syncer ON activate_outpoint.seen_block_id = syncer.id
678 WHERE
679 activate_outpoint.seen_block_id IS NULL
680 OR (syncer.height + activate_outpoint.timelock > $2)
681 ),
682
683 -- Transactions with cancelled conditions
684 cancelled_txs AS (
685 -- Transactions with cancelled outpoints (not seen)
686 SELECT DISTINCT
687 cancelled_id AS tx_id
688 FROM
689 tx_sender_cancel_try_to_send_outpoints
690 WHERE
691 seen_block_id IS NOT NULL
692
693 UNION
694
695 -- Transactions with cancelled txids (not seen)
696 SELECT DISTINCT
697 cancelled_id AS tx_id
698 FROM
699 tx_sender_cancel_try_to_send_txids
700 WHERE
701 seen_block_id IS NOT NULL
702 )
703
704 -- Final query to get sendable transactions
705 SELECT
706 txs.id
707 FROM
708 tx_sender_try_to_send_txs AS txs
709 WHERE
710 -- Transaction must not be in the non-active list
711 txs.id NOT IN (SELECT tx_id FROM non_active_txs)
712 -- Transaction must not be in the cancelled list
713 AND txs.id NOT IN (SELECT tx_id FROM cancelled_txs)
714 -- Transaction must not be already confirmed
715 AND txs.seen_block_id IS NULL
716 -- Check if fee_rate is lower than the provided fee rate or null
717 AND (txs.effective_fee_rate IS NULL OR txs.effective_fee_rate < $1);",
718 )
719 .bind(
720 i64::try_from(fee_rate.to_sat_per_vb_ceil())
721 .wrap_err("Failed to convert fee rate to i64")?,
722 )
723 .bind(
724 i32::try_from(current_tip_height)
725 .wrap_err("Failed to convert current tip height to i32")?,
726 );
727
728 let results = execute_query_with_tx!(self.connection, tx, select_query, fetch_all)?;
729
730 let txs = results
731 .into_iter()
732 .map(|(id,)| u32::try_from(id))
733 .collect::<Result<Vec<_>, _>>()
734 .wrap_err("Failed to convert id to u32")?;
735
736 Ok(txs)
737 }
738
739 pub async fn update_effective_fee_rate(
740 &self,
741 tx: Option<DatabaseTransaction<'_, '_>>,
742 id: u32,
743 effective_fee_rate: FeeRate,
744 ) -> Result<(), BridgeError> {
745 let query = sqlx::query(
746 "UPDATE tx_sender_try_to_send_txs SET effective_fee_rate = $1 WHERE id = $2",
747 )
748 .bind(
749 i64::try_from(effective_fee_rate.to_sat_per_vb_ceil())
750 .wrap_err("Failed to convert effective fee rate to i64")?,
751 )
752 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
753
754 execute_query_with_tx!(self.connection, tx, query, execute)?;
755
756 Ok(())
757 }
758
759 pub async fn get_try_to_send_tx(
760 &self,
761 tx: Option<DatabaseTransaction<'_, '_>>,
762 id: u32,
763 ) -> Result<
764 (
765 Option<TxMetadata>,
766 Transaction,
767 FeePayingType,
768 Option<u32>,
769 Option<RbfSigningInfo>,
770 ),
771 BridgeError,
772 > {
773 let query = sqlx::query_as::<
774 _,
775 (
776 Option<String>,
777 Option<Vec<u8>>,
778 FeePayingType,
779 Option<i32>,
780 Option<String>,
781 ),
782 >(
783 "SELECT tx_metadata, raw_tx, fee_paying_type, seen_block_id, rbf_signing_info
784 FROM tx_sender_try_to_send_txs
785 WHERE id = $1 LIMIT 1",
786 )
787 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
788
789 let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
790 Ok((
791 serde_json::from_str(result.0.as_deref().unwrap_or("null"))
792 .wrap_err_with(|| format!("Failed to decode tx_metadata from {:?}", result.0))?,
793 result
794 .1
795 .as_deref()
796 .map(deserialize)
797 .ok_or_eyre("Expected raw_tx to be present")?
798 .wrap_err("Failed to deserialize raw_tx")?,
799 result.2,
800 result
801 .3
802 .map(u32::try_from)
803 .transpose()
804 .wrap_err("Failed to convert seen_block_id to u32")?,
805 serde_json::from_str(result.4.as_deref().unwrap_or("null")).wrap_err_with(|| {
806 format!("Failed to decode rbf_signing_info from {:?}", result.4)
807 })?,
808 ))
809 }
810
811 pub async fn save_tx_debug_submission_error(
815 &self,
816 tx_id: u32,
817 error_message: &str,
818 ) -> Result<(), BridgeError> {
819 let query = sqlx::query(
820 "INSERT INTO tx_sender_debug_submission_errors (tx_id, error_message) VALUES ($1, $2)",
821 )
822 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?)
823 .bind(error_message);
824
825 self.connection.execute(query).await?;
826 Ok(())
827 }
828
829 pub async fn update_tx_debug_sending_state(
834 &self,
835 tx_id: u32,
836 state: &str,
837 activated: bool,
838 ) -> Result<(), BridgeError> {
839 let query = sqlx::query(
840 r#"
841 INSERT INTO tx_sender_debug_sending_state
842 (tx_id, state, last_update, activated_timestamp)
843 VALUES ($1, $2, NOW(),
844 CASE
845 WHEN $3 = TRUE THEN NOW()
846 ELSE NULL
847 END
848 )
849 ON CONFLICT (tx_id) DO UPDATE SET
850 state = $2,
851 last_update = NOW(),
852 activated_timestamp = COALESCE(tx_sender_debug_sending_state.activated_timestamp,
853 CASE
854 WHEN $3 = TRUE THEN NOW()
855 ELSE NULL
856 END
857 )
858 "#,
859 )
860 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?)
861 .bind(state)
862 .bind(activated);
863
864 self.connection.execute(query).await?;
865 Ok(())
866 }
867
868 pub async fn get_tx_debug_info(
870 &self,
871 tx: Option<DatabaseTransaction<'_, '_>>,
872 tx_id: u32,
873 ) -> Result<Option<String>, BridgeError> {
874 let query = sqlx::query_as::<_, (Option<String>,)>(
875 r#"
876 SELECT state
877 FROM tx_sender_debug_sending_state
878 WHERE tx_id = $1
879 "#,
880 )
881 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
882
883 let result = execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
884 match result {
885 Some((state,)) => Ok(state),
886 None => Ok(None),
887 }
888 }
889
890 pub async fn get_tx_debug_submission_errors(
892 &self,
893 tx: Option<DatabaseTransaction<'_, '_>>,
894 tx_id: u32,
895 ) -> Result<Vec<(String, String)>, BridgeError> {
896 let query = sqlx::query_as::<_, (String, String)>(
897 r#"
898 SELECT error_message, timestamp::TEXT
899 FROM tx_sender_debug_submission_errors
900 WHERE tx_id = $1
901 ORDER BY timestamp ASC
902 "#,
903 )
904 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
905
906 execute_query_with_tx!(self.connection, tx, query, fetch_all).map_err(Into::into)
907 }
908
909 pub async fn get_tx_debug_fee_payer_utxos(
911 &self,
912 tx: Option<DatabaseTransaction<'_, '_>>,
913 tx_id: u32,
914 ) -> Result<Vec<(Txid, u32, Amount, bool)>, BridgeError> {
915 let query = sqlx::query_as::<_, (TxidDB, i32, i64, bool)>(
916 r#"
917 SELECT fee_payer_txid, vout, amount, seen_block_id IS NOT NULL as confirmed
918 FROM tx_sender_fee_payer_utxos
919 WHERE bumped_id = $1
920 "#,
921 )
922 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
923
924 let results: Vec<(TxidDB, i32, i64, bool)> =
925 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
926
927 results
928 .iter()
929 .map(|(fee_payer_txid, vout, amount, confirmed)| {
930 Ok((
931 fee_payer_txid.0,
932 u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
933 Amount::from_sat(
934 u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
935 ),
936 *confirmed,
937 ))
938 })
939 .collect::<Result<Vec<_>, BridgeError>>()
940 }
941
942 pub async fn purge_tx_debug_info(
944 &self,
945 mut tx: Option<DatabaseTransaction<'_, '_>>,
946 tx_id: u32,
947 ) -> Result<(), BridgeError> {
948 let queries = [
949 "DELETE FROM tx_sender_debug_state_changes WHERE tx_id = $1",
950 "DELETE FROM tx_sender_debug_submission_errors WHERE tx_id = $1",
951 "DELETE FROM tx_sender_debug_sending_state WHERE tx_id = $1",
952 ];
953
954 for query_str in queries {
955 let query = sqlx::query(query_str)
956 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
957
958 execute_query_with_tx!(self.connection, tx.as_deref_mut(), query, execute)?;
959 }
960
961 Ok(())
962 }
963}
964
965#[cfg(test)]
966mod tests {
967 use super::*;
968 use crate::database::Database;
969 use crate::test::common::*;
970 use bitcoin::absolute::Height;
971 use bitcoin::hashes::Hash;
972 use bitcoin::transaction::Version;
973 use bitcoin::{Block, OutPoint, TapNodeHash, Txid};
974
975 async fn setup_test_db() -> Database {
976 let config = create_test_config_with_thread_name().await;
977 Database::new(&config).await.unwrap()
978 }
979
980 #[tokio::test]
981 async fn test_save_and_get_tx() {
982 let db = setup_test_db().await;
983 let tx = Transaction {
984 version: Version::TWO,
985 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
986 input: vec![],
987 output: vec![],
988 };
989
990 let txid = tx.compute_txid();
992 let rbfinfo = Some(RbfSigningInfo {
993 vout: 123,
994 tweak_merkle_root: Some(TapNodeHash::all_zeros()),
995 annex: None,
996 additional_taproot_output_count: None,
997 });
998 let id = db
999 .save_tx(None, None, &tx, FeePayingType::CPFP, txid, rbfinfo.clone())
1000 .await
1001 .unwrap();
1002
1003 let (_, retrieved_tx, fee_paying_type, seen_block_id, rbf_signing_info) =
1005 db.get_try_to_send_tx(None, id).await.unwrap();
1006 assert_eq!(tx.version, retrieved_tx.version);
1007 assert_eq!(fee_paying_type, FeePayingType::CPFP);
1008 assert_eq!(seen_block_id, None);
1009 assert_eq!(rbf_signing_info, rbfinfo);
1010 }
1011
1012 #[tokio::test]
1013 async fn test_fee_payer_utxo_operations() {
1014 let db = setup_test_db().await;
1015 let mut dbtx = db.begin_transaction().await.unwrap();
1016
1017 let tx = Transaction {
1019 version: Version::TWO,
1020 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1021 input: vec![],
1022 output: vec![],
1023 };
1024
1025 let tx_id = db
1027 .save_tx(
1028 Some(&mut dbtx),
1029 None,
1030 &tx,
1031 FeePayingType::CPFP,
1032 Txid::all_zeros(),
1033 None,
1034 )
1035 .await
1036 .unwrap();
1037
1038 let fee_payer_txid = Txid::hash(&[1u8; 32]);
1040 db.save_fee_payer_tx(
1041 Some(&mut dbtx),
1042 tx_id,
1043 fee_payer_txid,
1044 0,
1045 Amount::from_sat(50000),
1046 None,
1047 )
1048 .await
1049 .unwrap();
1050
1051 dbtx.commit().await.unwrap();
1052 }
1053
1054 #[tokio::test]
1055 async fn test_confirm_and_unconfirm_transactions() {
1056 const BLOCK_HEX: &str = "0200000035ab154183570282ce9afc0b494c9fc6a3cfea05aa8c1add2ecc56490000000038ba3d78e4500a5a7570dbe61960398add4410d278b21cd9708e6d9743f374d544fc055227f1001c29c1ea3b0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff3703a08601000427f1001c046a510100522cfabe6d6d0000000000000000000068692066726f6d20706f6f6c7365727665726aac1eeeed88ffffffff0100f2052a010000001976a914912e2b234f941f30b18afbb4fa46171214bf66c888ac00000000";
1057 let block: Block = deserialize(&hex::decode(BLOCK_HEX).unwrap()).unwrap();
1058
1059 let db = setup_test_db().await;
1060 let mut dbtx = db.begin_transaction().await.unwrap();
1061
1062 let block_id = crate::bitcoin_syncer::save_block(&db, &mut dbtx, &block, 100)
1064 .await
1065 .unwrap();
1066
1067 let tx = Transaction {
1069 version: Version::TWO,
1070 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1071 input: vec![],
1072 output: vec![],
1073 };
1074 let tx_id = db
1075 .save_tx(
1076 Some(&mut dbtx),
1077 None,
1078 &tx,
1079 FeePayingType::CPFP,
1080 Txid::all_zeros(),
1081 None,
1082 )
1083 .await
1084 .unwrap();
1085
1086 let fee_payer_txid = Txid::hash(&[1u8; 32]);
1088 db.save_fee_payer_tx(
1089 Some(&mut dbtx),
1090 tx_id,
1091 fee_payer_txid,
1092 0,
1093 Amount::from_sat(50000),
1094 None,
1095 )
1096 .await
1097 .unwrap();
1098
1099 db.insert_txid_to_block(&mut dbtx, block_id, &fee_payer_txid)
1101 .await
1102 .unwrap();
1103
1104 db.confirm_transactions(&mut dbtx, block_id).await.unwrap();
1106
1107 dbtx.commit().await.unwrap();
1108 }
1109
1110 #[tokio::test]
1111 async fn test_cancelled_outpoints_and_txids() {
1112 let db = setup_test_db().await;
1113 let mut dbtx = db.begin_transaction().await.unwrap();
1114
1115 let tx = Transaction {
1117 version: Version::TWO,
1118 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1119 input: vec![],
1120 output: vec![],
1121 };
1122
1123 let tx_id = db
1125 .save_tx(
1126 Some(&mut dbtx),
1127 None,
1128 &tx,
1129 FeePayingType::CPFP,
1130 Txid::all_zeros(),
1131 None,
1132 )
1133 .await
1134 .unwrap();
1135
1136 let txid = Txid::hash(&[0u8; 32]);
1138 let vout = 0;
1139
1140 db.save_cancelled_outpoint(Some(&mut dbtx), tx_id, OutPoint { txid, vout })
1142 .await
1143 .unwrap();
1144
1145 db.save_cancelled_txid(Some(&mut dbtx), tx_id, txid)
1147 .await
1148 .unwrap();
1149
1150 dbtx.commit().await.unwrap();
1151 }
1152
1153 #[tokio::test]
1154 async fn test_get_sendable_txs() {
1155 let db = setup_test_db().await;
1156 let mut dbtx = db.begin_transaction().await.unwrap();
1157
1158 let tx1 = Transaction {
1160 version: Version::TWO,
1161 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1162 input: vec![],
1163 output: vec![],
1164 };
1165 let tx2 = Transaction {
1166 version: Version::TWO,
1167 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1168 input: vec![],
1169 output: vec![],
1170 };
1171
1172 let id1 = db
1173 .save_tx(
1174 Some(&mut dbtx),
1175 None,
1176 &tx1,
1177 FeePayingType::CPFP,
1178 Txid::all_zeros(),
1179 None,
1180 )
1181 .await
1182 .unwrap();
1183 let id2 = db
1184 .save_tx(
1185 Some(&mut dbtx),
1186 None,
1187 &tx2,
1188 FeePayingType::RBF,
1189 Txid::all_zeros(),
1190 None,
1191 )
1192 .await
1193 .unwrap();
1194
1195 let fee_rate = FeeRate::from_sat_per_vb(3).unwrap();
1197 let current_tip_height = 100;
1198
1199 let sendable_txs = db
1200 .get_sendable_txs(Some(&mut dbtx), fee_rate, current_tip_height)
1201 .await
1202 .unwrap();
1203
1204 assert_eq!(sendable_txs.len(), 2);
1206 assert!(sendable_txs.contains(&id1));
1207 assert!(sendable_txs.contains(&id2));
1208
1209 db.update_effective_fee_rate(Some(&mut dbtx), id1, fee_rate)
1212 .await
1213 .unwrap();
1214
1215 let sendable_txs = db
1216 .get_sendable_txs(Some(&mut dbtx), fee_rate, current_tip_height)
1217 .await
1218 .unwrap();
1219 assert_eq!(sendable_txs.len(), 1);
1220 assert!(sendable_txs.contains(&id2));
1221
1222 let sendable_txs = db
1224 .get_sendable_txs(
1225 Some(&mut dbtx),
1226 FeeRate::from_sat_per_vb(4).unwrap(),
1227 current_tip_height + 1,
1228 )
1229 .await
1230 .unwrap();
1231 assert_eq!(sendable_txs.len(), 2);
1232 assert!(sendable_txs.contains(&id1));
1233 assert!(sendable_txs.contains(&id2));
1234
1235 dbtx.commit().await.unwrap();
1236 }
1237
1238 #[tokio::test]
1239 async fn test_debug_sending_state() {
1240 let db = setup_test_db().await;
1241 let mut dbtx = db.begin_transaction().await.unwrap();
1242
1243 let tx = Transaction {
1245 version: Version::TWO,
1246 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1247 input: vec![],
1248 output: vec![],
1249 };
1250
1251 let tx_id = db
1253 .save_tx(
1254 None, None,
1256 &tx,
1257 FeePayingType::RBF,
1258 tx.compute_txid(),
1259 None,
1260 )
1261 .await
1262 .unwrap();
1263
1264 let initial_state = "waiting_for_fee_payer_utxos";
1266 db.update_tx_debug_sending_state(tx_id, initial_state, false)
1267 .await
1268 .unwrap();
1269
1270 let state = db.get_tx_debug_info(Some(&mut dbtx), tx_id).await.unwrap();
1272 assert_eq!(state, Some(initial_state.to_string()));
1273
1274 let active_state = "ready_to_send";
1276 db.update_tx_debug_sending_state(tx_id, active_state, true)
1277 .await
1278 .unwrap();
1279
1280 let state = db.get_tx_debug_info(Some(&mut dbtx), tx_id).await.unwrap();
1282 assert_eq!(state, Some(active_state.to_string()));
1283
1284 let error_message = "Failed to send transaction: insufficient fee";
1286 db.save_tx_debug_submission_error(tx_id, error_message)
1287 .await
1288 .unwrap();
1289
1290 let errors = db
1292 .get_tx_debug_submission_errors(Some(&mut dbtx), tx_id)
1293 .await
1294 .unwrap();
1295 assert_eq!(errors.len(), 1);
1296 assert_eq!(errors[0].0, error_message);
1297
1298 let second_error = "Network connection timeout";
1300 db.save_tx_debug_submission_error(tx_id, second_error)
1301 .await
1302 .unwrap();
1303
1304 let errors = db
1306 .get_tx_debug_submission_errors(Some(&mut dbtx), tx_id)
1307 .await
1308 .unwrap();
1309 assert_eq!(errors.len(), 2);
1310 assert_eq!(errors[0].0, error_message);
1311 assert_eq!(errors[1].0, second_error);
1312
1313 let final_state = "sent";
1315 db.update_tx_debug_sending_state(tx_id, final_state, true)
1316 .await
1317 .unwrap();
1318
1319 let state = db.get_tx_debug_info(Some(&mut dbtx), tx_id).await.unwrap();
1321 assert_eq!(state, Some(final_state.to_string()));
1322
1323 dbtx.commit().await.unwrap();
1324 }
1325}