1use async_trait::async_trait;
4
5use super::{wrapper::TxidDB, Database, DatabaseTransaction};
6use crate::execute_query_with_tx;
7use bitcoin::{
8 consensus::{deserialize, serialize},
9 Amount, FeeRate, Transaction, Txid,
10};
11use clementine_errors::BridgeError;
12use clementine_tx_sender::{ActivatedWithOutpoint, ActivatedWithTxid};
13use clementine_utils::{FeePayingType, RbfSigningInfo, TxMetadata};
14use eyre::{Context, OptionExt};
15use sqlx::Executor;
16use std::ops::DerefMut;
17
18impl Database {
19 pub async fn confirm_transactions(
22 &self,
23 tx: DatabaseTransaction<'_>,
24 block_id: u32,
25 ) -> Result<(), BridgeError> {
26 let block_id = i32::try_from(block_id).wrap_err("Failed to convert block id to i32")?;
27
28 let common_ctes = r#"
31 WITH relevant_txs AS (
32 SELECT txid
33 FROM bitcoin_syncer_txs
34 WHERE block_id = $1
35 ),
36 relevant_spent_utxos AS (
37 SELECT txid, vout
38 FROM bitcoin_syncer_spent_utxos
39 WHERE block_id = $1
40 ),
41 confirmed_rbf_ids AS (
42 SELECT rbf.id
43 FROM tx_sender_rbf_txids AS rbf
44 JOIN bitcoin_syncer_txs AS syncer ON rbf.txid = syncer.txid
45 WHERE syncer.block_id = $1
46 )
47 "#;
48
49 sqlx::query(&format!(
51 "{common_ctes}
52 UPDATE tx_sender_activate_try_to_send_txids AS tap
53 SET seen_block_id = $1
54 WHERE tap.txid IN (SELECT txid FROM relevant_txs)
55 AND tap.seen_block_id IS NULL"
56 ))
57 .bind(block_id)
58 .execute(tx.deref_mut())
59 .await?;
60
61 sqlx::query(&format!(
63 "{common_ctes}
64 UPDATE tx_sender_activate_try_to_send_outpoints AS tap
65 SET seen_block_id = $1
66 WHERE (tap.txid, tap.vout) IN (SELECT txid, vout FROM relevant_spent_utxos)
67 AND tap.seen_block_id IS NULL"
68 ))
69 .bind(block_id)
70 .execute(tx.deref_mut())
71 .await?;
72
73 sqlx::query(&format!(
75 "{common_ctes}
76 UPDATE tx_sender_cancel_try_to_send_txids AS ctt
77 SET seen_block_id = $1
78 WHERE ctt.txid IN (SELECT txid FROM relevant_txs)
79 AND ctt.seen_block_id IS NULL"
80 ))
81 .bind(block_id)
82 .execute(tx.deref_mut())
83 .await?;
84
85 sqlx::query(&format!(
87 "{common_ctes}
88 UPDATE tx_sender_cancel_try_to_send_outpoints AS cto
89 SET seen_block_id = $1
90 WHERE (cto.txid, cto.vout) IN (SELECT txid, vout FROM relevant_spent_utxos)
91 AND cto.seen_block_id IS NULL"
92 ))
93 .bind(block_id)
94 .execute(tx.deref_mut())
95 .await?;
96
97 sqlx::query(&format!(
99 "{common_ctes}
100 UPDATE tx_sender_fee_payer_utxos AS fpu
101 SET seen_block_id = $1
102 WHERE fpu.fee_payer_txid IN (SELECT txid FROM relevant_txs)
103 AND fpu.seen_block_id IS NULL"
104 ))
105 .bind(block_id)
106 .execute(tx.deref_mut())
107 .await?;
108
109 sqlx::query(&format!(
111 "{common_ctes}
112 UPDATE tx_sender_try_to_send_txs AS txs
113 SET seen_block_id = $1
114 WHERE txs.txid IN (SELECT txid FROM relevant_txs)
115 AND txs.seen_block_id IS NULL"
116 ))
117 .bind(block_id)
118 .execute(tx.deref_mut())
119 .await?;
120
121 sqlx::query(&format!(
123 "{common_ctes}
124 UPDATE tx_sender_try_to_send_txs AS txs
125 SET seen_block_id = $1
126 WHERE txs.id IN (SELECT id FROM confirmed_rbf_ids)
127 AND txs.seen_block_id IS NULL"
128 ))
129 .bind(block_id)
130 .execute(tx.deref_mut())
131 .await?;
132
133 let bg_db = self.clone();
134 tokio::spawn(async move {
136 let Ok(confirmed_direct_txs): Result<Vec<(i32, TxidDB)>, _> = sqlx::query_as(&format!(
138 "{common_ctes}
139 SELECT txs.id, txs.txid
140 FROM tx_sender_try_to_send_txs AS txs
141 WHERE txs.txid IN (SELECT txid FROM relevant_txs)",
142 ))
143 .bind(block_id)
144 .fetch_all(&bg_db.connection)
145 .await
146 else {
147 tracing::error!("Failed to update debug info for confirmed txs");
148 return;
149 };
150
151 let Ok(confirmed_rbf_txs): Result<Vec<(i32,)>, _> = sqlx::query_as(&format!(
153 "{common_ctes}
154 SELECT txs.id
155 FROM tx_sender_try_to_send_txs AS txs
156 WHERE txs.id IN (SELECT id FROM confirmed_rbf_ids)",
157 ))
158 .bind(block_id)
159 .fetch_all(&bg_db.connection)
160 .await
161 else {
162 tracing::error!("Failed to update debug info for confirmed txs");
163 return;
164 };
165
166 for (tx_id, txid) in confirmed_direct_txs {
168 tracing::debug!(try_to_send_id=?tx_id, "Transaction confirmed in block {}: direct confirmation of txid {}",
170 block_id, txid.0);
171
172 let _ = bg_db
174 .update_tx_debug_sending_state(tx_id as u32, "confirmed", true)
175 .await;
176 }
177
178 for (tx_id,) in confirmed_rbf_txs {
180 tracing::debug!(try_to_send_id=?tx_id, "Transaction confirmed in block {}: RBF confirmation",
182 block_id);
183
184 let _ = bg_db
186 .update_tx_debug_sending_state(tx_id as u32, "confirmed", true)
187 .await;
188 }
189 });
190
191 Ok(())
192 }
193
194 pub async fn unconfirm_transactions(
199 &self,
200 tx: DatabaseTransaction<'_>,
201 block_id: u32,
202 ) -> Result<(), BridgeError> {
203 let block_id = i32::try_from(block_id).wrap_err("Failed to convert block id to i32")?;
204
205 let previously_confirmed_txs = sqlx::query_as::<_, (i32,)>(
208 "SELECT id FROM tx_sender_try_to_send_txs WHERE seen_block_id = $1",
209 )
210 .bind(block_id)
211 .fetch_all(tx.deref_mut())
212 .await;
213
214 let bg_db = self.clone();
215 tokio::spawn(async move {
216 let previously_confirmed_txs = match previously_confirmed_txs {
217 Ok(txs) => txs,
218 Err(e) => {
219 tracing::error!(error=?e, "Failed to get previously confirmed txs from database");
220 return;
221 }
222 };
223
224 for (tx_id,) in previously_confirmed_txs {
225 tracing::debug!(try_to_send_id=?tx_id, "Transaction unconfirmed in block {}: unconfirming", block_id);
226 let _ = bg_db
227 .update_tx_debug_sending_state(tx_id as u32, "unconfirmed", false)
228 .await;
229 }
230 });
231
232 sqlx::query(
235 "UPDATE tx_sender_activate_try_to_send_txids AS tap
236 SET seen_block_id = NULL
237 WHERE tap.seen_block_id = $1",
238 )
239 .bind(block_id)
240 .execute(tx.deref_mut())
241 .await?;
242
243 sqlx::query(
245 "UPDATE tx_sender_activate_try_to_send_outpoints AS tap
246 SET seen_block_id = NULL
247 WHERE tap.seen_block_id = $1",
248 )
249 .bind(block_id)
250 .execute(tx.deref_mut())
251 .await?;
252
253 sqlx::query(
255 "UPDATE tx_sender_cancel_try_to_send_txids AS ctt
256 SET seen_block_id = NULL
257 WHERE ctt.seen_block_id = $1",
258 )
259 .bind(block_id)
260 .execute(tx.deref_mut())
261 .await?;
262
263 sqlx::query(
265 "UPDATE tx_sender_cancel_try_to_send_outpoints AS cto
266 SET seen_block_id = NULL
267 WHERE cto.seen_block_id = $1",
268 )
269 .bind(block_id)
270 .execute(tx.deref_mut())
271 .await?;
272
273 sqlx::query(
275 "UPDATE tx_sender_fee_payer_utxos AS fpu
276 SET seen_block_id = NULL
277 WHERE fpu.seen_block_id = $1",
278 )
279 .bind(block_id)
280 .execute(tx.deref_mut())
281 .await?;
282
283 sqlx::query(
285 "UPDATE tx_sender_try_to_send_txs AS txs
286 SET seen_block_id = NULL
287 WHERE txs.seen_block_id = $1",
288 )
289 .bind(block_id)
290 .execute(tx.deref_mut())
291 .await?;
292
293 Ok(())
294 }
295
296 pub async fn save_fee_payer_tx(
305 &self,
306 tx: Option<DatabaseTransaction<'_>>,
307 bumped_id: u32,
308 fee_payer_txid: Txid,
309 vout: u32,
310 amount: Amount,
311 replacement_of_id: Option<u32>,
312 ) -> Result<(), BridgeError> {
313 let query = sqlx::query(
314 "INSERT INTO tx_sender_fee_payer_utxos (bumped_id, fee_payer_txid, vout, amount, replacement_of_id)
315 VALUES ($1, $2, $3, $4, $5)",
316 )
317 .bind(i32::try_from(bumped_id).wrap_err("Failed to convert bumped id to i32")?)
318 .bind(TxidDB(fee_payer_txid))
319 .bind(i32::try_from(vout).wrap_err("Failed to convert vout to i32")?)
320 .bind(i64::try_from(amount.to_sat()).wrap_err("Failed to convert amount to i64")?)
321 .bind(replacement_of_id.map( i32::try_from).transpose().wrap_err("Failed to convert replacement of id to i32")?);
322
323 execute_query_with_tx!(self.connection, tx, query, execute)?;
324
325 Ok(())
326 }
327
328 pub async fn get_all_unconfirmed_fee_payer_txs(
343 &self,
344 tx: Option<DatabaseTransaction<'_>>,
345 ) -> Result<Vec<(u32, u32, Txid, u32, Amount, Option<u32>)>, BridgeError> {
346 let query = sqlx::query_as::<_, (i32, i32, TxidDB, i32, i64, Option<i32>)>(
347 "
348 SELECT fpu.id, fpu.bumped_id, fpu.fee_payer_txid, fpu.vout, fpu.amount, fpu.replacement_of_id
349 FROM tx_sender_fee_payer_utxos fpu
350 WHERE fpu.seen_block_id IS NULL
351 AND fpu.is_evicted = false
352 AND NOT EXISTS (
353 SELECT 1
354 FROM tx_sender_fee_payer_utxos x
355 WHERE (x.replacement_of_id = fpu.replacement_of_id OR x.id = fpu.replacement_of_id)
356 AND x.seen_block_id IS NOT NULL
357 )
358 ",
359 );
360
361 let results: Vec<(i32, i32, TxidDB, i32, i64, Option<i32>)> =
362 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
363
364 results
365 .iter()
366 .map(
367 |(id, bumped_id, fee_payer_txid, vout, amount, replacement_of_id)| {
368 Ok((
369 u32::try_from(*id).wrap_err("Failed to convert id to u32")?,
370 u32::try_from(*bumped_id).wrap_err("Failed to convert bumped id to u32")?,
371 fee_payer_txid.0,
372 u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
373 Amount::from_sat(
374 u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
375 ),
376 replacement_of_id
377 .map(u32::try_from)
378 .transpose()
379 .wrap_err("Failed to convert replacement of id to u32")?,
380 ))
381 },
382 )
383 .collect::<Result<Vec<_>, BridgeError>>()
384 }
385
386 pub async fn get_unconfirmed_fee_payer_txs(
402 &self,
403 tx: Option<DatabaseTransaction<'_>>,
404 bumped_id: u32,
405 ) -> Result<Vec<(u32, Txid, u32, Amount)>, BridgeError> {
406 let query = sqlx::query_as::<_, (i32, TxidDB, i32, i64)>(
407 "
408 SELECT fpu.id, fpu.fee_payer_txid, fpu.vout, fpu.amount
409 FROM tx_sender_fee_payer_utxos fpu
410 WHERE fpu.bumped_id = $1
411 AND fpu.seen_block_id IS NULL
412 AND fpu.is_evicted = false
413 AND NOT EXISTS (
414 SELECT 1
415 FROM tx_sender_fee_payer_utxos x
416 WHERE (x.replacement_of_id = fpu.replacement_of_id OR x.id = fpu.replacement_of_id)
417 AND x.seen_block_id IS NOT NULL
418 )
419 ",
420 )
421 .bind(i32::try_from(bumped_id).wrap_err("Failed to convert bumped id to i32")?);
422 let results: Vec<(i32, TxidDB, i32, i64)> =
423 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
424
425 results
426 .iter()
427 .map(|(id, fee_payer_txid, vout, amount)| {
428 Ok((
429 u32::try_from(*id).wrap_err("Failed to convert id to u32")?,
430 fee_payer_txid.0,
431 u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
432 Amount::from_sat(
433 u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
434 ),
435 ))
436 })
437 .collect::<Result<Vec<_>, BridgeError>>()
438 }
439
440 pub async fn mark_fee_payer_utxo_as_evicted(
443 &self,
444 tx: Option<DatabaseTransaction<'_>>,
445 id: u32,
446 ) -> Result<(), BridgeError> {
447 let query = sqlx::query(
448 "UPDATE tx_sender_fee_payer_utxos
449 SET is_evicted = true
450 WHERE id = $1
451 OR replacement_of_id = $1",
452 )
453 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
454
455 execute_query_with_tx!(self.connection, tx, query, execute)?;
456 Ok(())
457 }
458
459 pub async fn get_confirmed_fee_payer_utxos(
460 &self,
461 tx: Option<DatabaseTransaction<'_>>,
462 id: u32,
463 ) -> Result<Vec<(Txid, u32, Amount)>, BridgeError> {
464 let query = sqlx::query_as::<_, (TxidDB, i32, i64)>(
465 "SELECT fee_payer_txid, vout, amount
466 FROM tx_sender_fee_payer_utxos fpu
467 WHERE fpu.bumped_id = $1 AND fpu.seen_block_id IS NOT NULL",
468 )
469 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
470
471 let results: Vec<(TxidDB, i32, i64)> =
472 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
473
474 results
475 .iter()
476 .map(|(fee_payer_txid, vout, amount)| {
477 Ok((
478 fee_payer_txid.0,
479 u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
480 Amount::from_sat(
481 u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
482 ),
483 ))
484 })
485 .collect::<Result<Vec<_>, BridgeError>>()
486 }
487
488 pub async fn check_if_tx_exists_on_txsender(
491 &self,
492 tx: Option<DatabaseTransaction<'_>>,
493 txid: Txid,
494 ) -> Result<Option<u32>, BridgeError> {
495 let query = sqlx::query_as::<_, (i32,)>(
496 "SELECT id FROM tx_sender_try_to_send_txs WHERE txid = $1 LIMIT 1",
497 )
498 .bind(TxidDB(txid));
499
500 let result: Option<(i32,)> =
501 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
502 Ok(match result {
503 Some((id,)) => Some(u32::try_from(id).wrap_err("Failed to convert id to u32")?),
504 None => None,
505 })
506 }
507
508 pub async fn save_tx(
509 &self,
510 tx: Option<DatabaseTransaction<'_>>,
511 tx_metadata: Option<TxMetadata>,
512 raw_tx: &Transaction,
513 fee_paying_type: FeePayingType,
514 txid: Txid,
515 rbf_signing_info: Option<RbfSigningInfo>,
516 ) -> Result<u32, BridgeError> {
517 let query = sqlx::query_scalar(
518 "INSERT INTO tx_sender_try_to_send_txs (raw_tx, fee_paying_type, tx_metadata, txid, rbf_signing_info) VALUES ($1, $2::fee_paying_type, $3, $4, $5) RETURNING id"
519 )
520 .bind(serialize(raw_tx))
521 .bind(fee_paying_type)
522 .bind(serde_json::to_string(&tx_metadata).wrap_err("Failed to encode tx_metadata to JSON")?)
523 .bind(TxidDB(txid))
524 .bind(serde_json::to_string(&rbf_signing_info).wrap_err("Failed to encode tx_metadata to JSON")?);
525
526 let id: i32 = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
527 u32::try_from(id)
528 .wrap_err("Failed to convert id to u32")
529 .map_err(Into::into)
530 }
531
532 pub async fn save_rbf_txid(
533 &self,
534 tx: Option<DatabaseTransaction<'_>>,
535 id: u32,
536 txid: Txid,
537 ) -> Result<(), BridgeError> {
538 let query = sqlx::query("INSERT INTO tx_sender_rbf_txids (id, txid) VALUES ($1, $2)")
539 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?)
540 .bind(TxidDB(txid));
541
542 execute_query_with_tx!(self.connection, tx, query, execute)?;
543 Ok(())
544 }
545
546 pub async fn get_last_rbf_txid(
547 &self,
548 tx: Option<DatabaseTransaction<'_>>,
549 id: u32,
550 ) -> Result<Option<Txid>, BridgeError> {
551 let query = sqlx::query_as::<_, (TxidDB,)>("SELECT txid FROM tx_sender_rbf_txids WHERE id = $1 ORDER BY insertion_order DESC LIMIT 1")
552 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
553
554 let result: Option<(TxidDB,)> =
555 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
556 Ok(result.map(|(txid,)| txid.0))
557 }
558
559 pub async fn save_cancelled_outpoint(
560 &self,
561 tx: Option<DatabaseTransaction<'_>>,
562 cancelled_id: u32,
563 outpoint: bitcoin::OutPoint,
564 ) -> Result<(), BridgeError> {
565 let query = sqlx::query(
566 "INSERT INTO tx_sender_cancel_try_to_send_outpoints (cancelled_id, txid, vout) VALUES ($1, $2, $3)"
567 )
568 .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled id to i32")?)
569 .bind(TxidDB(outpoint.txid))
570 .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?);
571
572 execute_query_with_tx!(self.connection, tx, query, execute)?;
573 Ok(())
574 }
575
576 pub async fn save_cancelled_txid(
577 &self,
578 tx: Option<DatabaseTransaction<'_>>,
579 cancelled_id: u32,
580 txid: bitcoin::Txid,
581 ) -> Result<(), BridgeError> {
582 let query = sqlx::query(
583 "INSERT INTO tx_sender_cancel_try_to_send_txids (cancelled_id, txid) VALUES ($1, $2)",
584 )
585 .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled id to i32")?)
586 .bind(TxidDB(txid));
587
588 execute_query_with_tx!(self.connection, tx, query, execute)?;
589 Ok(())
590 }
591
592 pub async fn save_activated_txid(
593 &self,
594 tx: Option<DatabaseTransaction<'_>>,
595 activated_id: u32,
596 prerequisite_tx: &ActivatedWithTxid,
597 ) -> Result<(), BridgeError> {
598 let query = sqlx::query(
599 "INSERT INTO tx_sender_activate_try_to_send_txids (activated_id, txid, timelock) VALUES ($1, $2, $3)"
600 )
601 .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated id to i32")?)
602 .bind(TxidDB(prerequisite_tx.txid))
603 .bind(i32::try_from(prerequisite_tx.relative_block_height).wrap_err("Failed to convert relative block height to i32")?);
604
605 execute_query_with_tx!(self.connection, tx, query, execute)?;
606 Ok(())
607 }
608
609 pub async fn save_activated_outpoint(
610 &self,
611 tx: Option<DatabaseTransaction<'_>>,
612 activated_id: u32,
613 activated_outpoint: &ActivatedWithOutpoint,
614 ) -> Result<(), BridgeError> {
615 let query = sqlx::query(
616 "INSERT INTO tx_sender_activate_try_to_send_outpoints (activated_id, txid, vout, timelock) VALUES ($1, $2, $3, $4)"
617 )
618 .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated id to i32")?)
619 .bind(TxidDB(activated_outpoint.outpoint.txid))
620 .bind(i32::try_from(activated_outpoint.outpoint.vout).wrap_err("Failed to convert vout to i32")?)
621 .bind(i32::try_from(activated_outpoint.relative_block_height).wrap_err("Failed to convert relative block height to i32")?);
622
623 execute_query_with_tx!(self.connection, tx, query, execute)?;
624 Ok(())
625 }
626
627 pub async fn get_sendable_txs(
647 &self,
648 tx: Option<DatabaseTransaction<'_>>,
649 fee_rate: FeeRate,
650 current_tip_height: u32,
651 ) -> Result<Vec<u32>, BridgeError> {
652 let select_query = sqlx::query_as::<_, (i32,)>(
653 "WITH
654 -- Find non-active transactions (not seen or timelock not passed)
655 non_active_txs AS (
656 -- Transactions with txid activations that aren't active yet
657 SELECT DISTINCT
658 activate_txid.activated_id AS tx_id
659 FROM
660 tx_sender_activate_try_to_send_txids AS activate_txid
661 LEFT JOIN
662 bitcoin_syncer AS syncer ON activate_txid.seen_block_id = syncer.id
663 WHERE
664 activate_txid.seen_block_id IS NULL
665 OR (syncer.height + activate_txid.timelock > $2)
666
667 UNION
668
669 -- Transactions with outpoint activations that aren't active yet (not seen or timelock not passed)
670 SELECT DISTINCT
671 activate_outpoint.activated_id AS tx_id
672 FROM
673 tx_sender_activate_try_to_send_outpoints AS activate_outpoint
674 LEFT JOIN
675 bitcoin_syncer AS syncer ON activate_outpoint.seen_block_id = syncer.id
676 WHERE
677 activate_outpoint.seen_block_id IS NULL
678 OR (syncer.height + activate_outpoint.timelock > $2)
679 ),
680
681 -- Transactions with cancelled conditions
682 cancelled_txs AS (
683 -- Transactions with cancelled outpoints (not seen)
684 SELECT DISTINCT
685 cancelled_id AS tx_id
686 FROM
687 tx_sender_cancel_try_to_send_outpoints
688 WHERE
689 seen_block_id IS NOT NULL
690
691 UNION
692
693 -- Transactions with cancelled txids (not seen)
694 SELECT DISTINCT
695 cancelled_id AS tx_id
696 FROM
697 tx_sender_cancel_try_to_send_txids
698 WHERE
699 seen_block_id IS NOT NULL
700 )
701
702 -- Final query to get sendable transactions
703 SELECT
704 txs.id
705 FROM
706 tx_sender_try_to_send_txs AS txs
707 WHERE
708 -- Transaction must not be in the non-active list
709 txs.id NOT IN (SELECT tx_id FROM non_active_txs)
710 -- Transaction must not be in the cancelled list
711 AND txs.id NOT IN (SELECT tx_id FROM cancelled_txs)
712 -- Transaction must not be already confirmed
713 AND txs.seen_block_id IS NULL
714 -- Check if fee_rate is lower than the provided fee rate or null
715 AND (txs.effective_fee_rate IS NULL OR txs.effective_fee_rate < $1);",
716 )
717 .bind(
718 i64::try_from(fee_rate.to_sat_per_vb_ceil())
719 .wrap_err("Failed to convert fee rate to i64")?,
720 )
721 .bind(
722 i32::try_from(current_tip_height)
723 .wrap_err("Failed to convert current tip height to i32")?,
724 );
725
726 let results = execute_query_with_tx!(self.connection, tx, select_query, fetch_all)?;
727
728 let txs = results
729 .into_iter()
730 .map(|(id,)| u32::try_from(id))
731 .collect::<Result<Vec<_>, _>>()
732 .wrap_err("Failed to convert id to u32")?;
733
734 Ok(txs)
735 }
736
737 pub async fn update_effective_fee_rate(
738 &self,
739 tx: Option<DatabaseTransaction<'_>>,
740 id: u32,
741 effective_fee_rate: FeeRate,
742 ) -> Result<(), BridgeError> {
743 let query = sqlx::query(
744 "UPDATE tx_sender_try_to_send_txs SET effective_fee_rate = $1 WHERE id = $2",
745 )
746 .bind(
747 i64::try_from(effective_fee_rate.to_sat_per_vb_ceil())
748 .wrap_err("Failed to convert effective fee rate to i64")?,
749 )
750 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
751
752 execute_query_with_tx!(self.connection, tx, query, execute)?;
753
754 Ok(())
755 }
756
757 pub async fn get_try_to_send_tx(
758 &self,
759 tx: Option<DatabaseTransaction<'_>>,
760 id: u32,
761 ) -> Result<
762 (
763 Option<TxMetadata>,
764 Transaction,
765 FeePayingType,
766 Option<u32>,
767 Option<RbfSigningInfo>,
768 ),
769 BridgeError,
770 > {
771 let query = sqlx::query_as::<
772 _,
773 (
774 Option<String>,
775 Option<Vec<u8>>,
776 FeePayingType,
777 Option<i32>,
778 Option<String>,
779 ),
780 >(
781 "SELECT tx_metadata, raw_tx, fee_paying_type, seen_block_id, rbf_signing_info
782 FROM tx_sender_try_to_send_txs
783 WHERE id = $1 LIMIT 1",
784 )
785 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
786
787 let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
788 Ok((
789 serde_json::from_str(result.0.as_deref().unwrap_or("null"))
790 .wrap_err_with(|| format!("Failed to decode tx_metadata from {:?}", result.0))?,
791 result
792 .1
793 .as_deref()
794 .map(deserialize)
795 .ok_or_eyre("Expected raw_tx to be present")?
796 .wrap_err("Failed to deserialize raw_tx")?,
797 result.2,
798 result
799 .3
800 .map(u32::try_from)
801 .transpose()
802 .wrap_err("Failed to convert seen_block_id to u32")?,
803 serde_json::from_str(result.4.as_deref().unwrap_or("null")).wrap_err_with(|| {
804 format!("Failed to decode rbf_signing_info from {:?}", result.4)
805 })?,
806 ))
807 }
808
809 pub async fn save_tx_debug_submission_error(
813 &self,
814 tx: Option<DatabaseTransaction<'_>>,
815 tx_id: u32,
816 error_message: &str,
817 ) -> Result<(), BridgeError> {
818 let query = sqlx::query(
819 "INSERT INTO tx_sender_debug_submission_errors (tx_id, error_message) VALUES ($1, $2)",
820 )
821 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?)
822 .bind(error_message);
823
824 execute_query_with_tx!(self.connection, tx, query, execute)?;
825 Ok(())
826 }
827
828 pub async fn update_tx_debug_sending_state(
833 &self,
834 tx_id: u32,
835 state: &str,
836 activated: bool,
837 ) -> Result<(), BridgeError> {
838 let query = sqlx::query(
839 r#"
840 INSERT INTO tx_sender_debug_sending_state
841 (tx_id, state, last_update, activated_timestamp)
842 VALUES ($1, $2, NOW(),
843 CASE
844 WHEN $3 = TRUE THEN NOW()
845 ELSE NULL
846 END
847 )
848 ON CONFLICT (tx_id) DO UPDATE SET
849 state = $2,
850 last_update = NOW(),
851 activated_timestamp = COALESCE(tx_sender_debug_sending_state.activated_timestamp,
852 CASE
853 WHEN $3 = TRUE THEN NOW()
854 ELSE NULL
855 END
856 )
857 "#,
858 )
859 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?)
860 .bind(state)
861 .bind(activated);
862
863 self.connection.execute(query).await?;
864 Ok(())
865 }
866
867 pub async fn get_tx_debug_info(
869 &self,
870 tx: Option<DatabaseTransaction<'_>>,
871 tx_id: u32,
872 ) -> Result<Option<String>, BridgeError> {
873 let query = sqlx::query_as::<_, (Option<String>,)>(
874 r#"
875 SELECT state
876 FROM tx_sender_debug_sending_state
877 WHERE tx_id = $1
878 "#,
879 )
880 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
881
882 let result = execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
883 match result {
884 Some((state,)) => Ok(state),
885 None => Ok(None),
886 }
887 }
888
889 pub async fn get_tx_debug_submission_errors(
891 &self,
892 tx: Option<DatabaseTransaction<'_>>,
893 tx_id: u32,
894 ) -> Result<Vec<(String, String)>, BridgeError> {
895 let query = sqlx::query_as::<_, (String, String)>(
896 r#"
897 SELECT error_message, timestamp::TEXT
898 FROM tx_sender_debug_submission_errors
899 WHERE tx_id = $1
900 ORDER BY timestamp ASC
901 "#,
902 )
903 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
904
905 execute_query_with_tx!(self.connection, tx, query, fetch_all).map_err(Into::into)
906 }
907
908 pub async fn get_tx_debug_fee_payer_utxos(
910 &self,
911 tx: Option<DatabaseTransaction<'_>>,
912 tx_id: u32,
913 ) -> Result<Vec<(Txid, u32, Amount, bool)>, BridgeError> {
914 let query = sqlx::query_as::<_, (TxidDB, i32, i64, bool)>(
915 r#"
916 SELECT fee_payer_txid, vout, amount, seen_block_id IS NOT NULL as confirmed
917 FROM tx_sender_fee_payer_utxos
918 WHERE bumped_id = $1
919 "#,
920 )
921 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
922
923 let results: Vec<(TxidDB, i32, i64, bool)> =
924 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
925
926 results
927 .iter()
928 .map(|(fee_payer_txid, vout, amount, confirmed)| {
929 Ok((
930 fee_payer_txid.0,
931 u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
932 Amount::from_sat(
933 u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
934 ),
935 *confirmed,
936 ))
937 })
938 .collect::<Result<Vec<_>, BridgeError>>()
939 }
940}
941
942#[async_trait]
943impl clementine_tx_sender::TxSenderDatabase for Database {
944 type Transaction = sqlx::Transaction<'static, sqlx::Postgres>;
945 async fn begin_transaction(
946 &self,
947 ) -> Result<sqlx::Transaction<'static, sqlx::Postgres>, BridgeError> {
948 self.begin_transaction().await
949 }
950
951 async fn commit_transaction(
952 &self,
953 tx: sqlx::Transaction<'static, sqlx::Postgres>,
954 ) -> Result<(), BridgeError> {
955 tx.commit().await.map_err(Into::into)
956 }
957
958 async fn save_tx_debug_submission_error(
959 &self,
960 dbtx: Option<&mut Self::Transaction>,
961 tx_id: u32,
962 error_message: &str,
963 ) -> Result<(), BridgeError> {
964 self.save_tx_debug_submission_error(dbtx, tx_id, error_message)
965 .await
966 }
967
968 async fn get_sendable_txs(
969 &self,
970 fee_rate: FeeRate,
971 current_tip_height: u32,
972 ) -> Result<Vec<u32>, BridgeError> {
973 self.get_sendable_txs(None, fee_rate, current_tip_height)
974 .await
975 }
976
977 async fn get_try_to_send_tx(
978 &self,
979 tx: Option<&mut Self::Transaction>,
980 id: u32,
981 ) -> Result<
982 (
983 Option<TxMetadata>,
984 Transaction,
985 FeePayingType,
986 Option<u32>,
987 Option<RbfSigningInfo>,
988 ),
989 BridgeError,
990 > {
991 self.get_try_to_send_tx(tx, id).await
992 }
993
994 async fn update_tx_debug_sending_state(
995 &self,
996 tx_id: u32,
997 state: &str,
998 activated: bool,
999 ) -> Result<(), BridgeError> {
1000 self.update_tx_debug_sending_state(tx_id, state, activated)
1001 .await
1002 }
1003
1004 async fn get_all_unconfirmed_fee_payer_txs(
1005 &self,
1006 tx: Option<&mut Self::Transaction>,
1007 ) -> Result<Vec<(u32, u32, Txid, u32, Amount, Option<u32>)>, BridgeError> {
1008 self.get_all_unconfirmed_fee_payer_txs(tx).await
1009 }
1010
1011 async fn get_unconfirmed_fee_payer_txs(
1012 &self,
1013 tx: Option<&mut Self::Transaction>,
1014 bumped_id: u32,
1015 ) -> Result<Vec<(u32, Txid, u32, Amount)>, BridgeError> {
1016 self.get_unconfirmed_fee_payer_txs(tx, bumped_id).await
1017 }
1018
1019 async fn mark_fee_payer_utxo_as_evicted(
1020 &self,
1021 tx: Option<&mut Self::Transaction>,
1022 id: u32,
1023 ) -> Result<(), BridgeError> {
1024 self.mark_fee_payer_utxo_as_evicted(tx, id).await
1025 }
1026
1027 async fn get_confirmed_fee_payer_utxos(
1028 &self,
1029 tx: Option<&mut Self::Transaction>,
1030 id: u32,
1031 ) -> Result<Vec<(Txid, u32, Amount)>, BridgeError> {
1032 self.get_confirmed_fee_payer_utxos(tx, id).await
1033 }
1034
1035 async fn save_fee_payer_tx(
1036 &self,
1037 tx: Option<&mut Self::Transaction>,
1038 _try_to_send_id: Option<u32>,
1039 bumped_id: u32,
1040 fee_payer_txid: Txid,
1041 vout: u32,
1042 amount: Amount,
1043 replacement_of_id: Option<u32>,
1044 ) -> Result<(), BridgeError> {
1045 self.save_fee_payer_tx(
1046 tx,
1047 bumped_id,
1048 fee_payer_txid,
1049 vout,
1050 amount,
1051 replacement_of_id,
1052 )
1053 .await
1054 }
1055
1056 async fn get_last_rbf_txid(
1057 &self,
1058 tx: Option<&mut Self::Transaction>,
1059 id: u32,
1060 ) -> Result<Option<Txid>, BridgeError> {
1061 self.get_last_rbf_txid(tx, id).await
1062 }
1063
1064 async fn save_rbf_txid(
1065 &self,
1066 tx: Option<&mut Self::Transaction>,
1067 id: u32,
1068 txid: Txid,
1069 ) -> Result<(), BridgeError> {
1070 self.save_rbf_txid(tx, id, txid).await
1071 }
1072
1073 async fn save_cancelled_outpoint(
1074 &self,
1075 tx: Option<&mut Self::Transaction>,
1076 cancelled_id: u32,
1077 outpoint: bitcoin::OutPoint,
1078 ) -> Result<(), BridgeError> {
1079 self.save_cancelled_outpoint(tx, cancelled_id, outpoint)
1080 .await
1081 }
1082
1083 async fn save_cancelled_txid(
1084 &self,
1085 tx: Option<&mut Self::Transaction>,
1086 cancelled_id: u32,
1087 txid: bitcoin::Txid,
1088 ) -> Result<(), BridgeError> {
1089 self.save_cancelled_txid(tx, cancelled_id, txid).await
1090 }
1091
1092 async fn save_activated_txid(
1093 &self,
1094 tx: Option<&mut Self::Transaction>,
1095 activated_id: u32,
1096 prerequisite_tx: &ActivatedWithTxid,
1097 ) -> Result<(), BridgeError> {
1098 self.save_activated_txid(tx, activated_id, prerequisite_tx)
1099 .await
1100 }
1101
1102 async fn save_activated_outpoint(
1103 &self,
1104 tx: Option<&mut Self::Transaction>,
1105 activated_id: u32,
1106 activated_outpoint: &ActivatedWithOutpoint,
1107 ) -> Result<(), BridgeError> {
1108 self.save_activated_outpoint(tx, activated_id, activated_outpoint)
1109 .await
1110 }
1111
1112 async fn update_effective_fee_rate(
1113 &self,
1114 tx: Option<&mut Self::Transaction>,
1115 id: u32,
1116 effective_fee_rate: FeeRate,
1117 ) -> Result<(), BridgeError> {
1118 self.update_effective_fee_rate(tx, id, effective_fee_rate)
1119 .await
1120 }
1121
1122 async fn check_if_tx_exists_on_txsender(
1123 &self,
1124 tx: Option<&mut Self::Transaction>,
1125 txid: Txid,
1126 ) -> Result<Option<u32>, BridgeError> {
1127 self.check_if_tx_exists_on_txsender(tx, txid).await
1128 }
1129
1130 async fn save_tx(
1131 &self,
1132 tx: Option<&mut Self::Transaction>,
1133 tx_metadata: Option<TxMetadata>,
1134 raw_tx: &Transaction,
1135 fee_paying_type: FeePayingType,
1136 txid: Txid,
1137 rbf_signing_info: Option<RbfSigningInfo>,
1138 ) -> Result<u32, BridgeError> {
1139 self.save_tx(
1140 tx,
1141 tx_metadata,
1142 raw_tx,
1143 fee_paying_type,
1144 txid,
1145 rbf_signing_info,
1146 )
1147 .await
1148 }
1149
1150 async fn get_tx_debug_info(
1151 &self,
1152 tx: Option<&mut Self::Transaction>,
1153 tx_id: u32,
1154 ) -> Result<Option<String>, BridgeError> {
1155 self.get_tx_debug_info(tx, tx_id).await
1156 }
1157
1158 async fn get_tx_debug_submission_errors(
1159 &self,
1160 tx: Option<&mut Self::Transaction>,
1161 tx_id: u32,
1162 ) -> Result<Vec<(String, String)>, BridgeError> {
1163 self.get_tx_debug_submission_errors(tx, tx_id).await
1164 }
1165
1166 async fn get_tx_debug_fee_payer_utxos(
1167 &self,
1168 tx: Option<&mut Self::Transaction>,
1169 tx_id: u32,
1170 ) -> Result<Vec<(Txid, u32, Amount, bool)>, BridgeError> {
1171 self.get_tx_debug_fee_payer_utxos(tx, tx_id).await
1172 }
1173
1174 async fn fetch_next_bitcoin_syncer_evt(
1175 &self,
1176 tx: &mut Self::Transaction,
1177 consumer_handle: &str,
1178 ) -> Result<Option<clementine_primitives::BitcoinSyncerEvent>, BridgeError> {
1179 self.fetch_next_bitcoin_syncer_evt(tx, consumer_handle)
1180 .await
1181 }
1182
1183 async fn get_block_info_from_id(
1184 &self,
1185 tx: Option<&mut Self::Transaction>,
1186 block_id: u32,
1187 ) -> Result<Option<(bitcoin::BlockHash, u32)>, BridgeError> {
1188 self.get_block_info_from_id(tx, block_id).await
1189 }
1190
1191 async fn confirm_transactions(
1192 &self,
1193 tx: &mut Self::Transaction,
1194 block_id: u32,
1195 ) -> Result<(), BridgeError> {
1196 self.confirm_transactions(tx, block_id).await
1197 }
1198
1199 async fn unconfirm_transactions(
1200 &self,
1201 tx: &mut Self::Transaction,
1202 block_id: u32,
1203 ) -> Result<(), BridgeError> {
1204 self.unconfirm_transactions(tx, block_id).await
1205 }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210 use super::*;
1211 use crate::database::Database;
1212 use crate::test::common::*;
1213 use bitcoin::absolute::Height;
1214 use bitcoin::hashes::Hash;
1215 use bitcoin::transaction::Version;
1216 use bitcoin::{Block, OutPoint, TapNodeHash, Txid};
1217
1218 async fn setup_test_db() -> Database {
1219 let config = create_test_config_with_thread_name().await;
1220 Database::new(&config).await.unwrap()
1221 }
1222
1223 #[tokio::test]
1224 async fn test_save_and_get_tx() {
1225 let db = setup_test_db().await;
1226 let tx = Transaction {
1227 version: Version::TWO,
1228 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1229 input: vec![],
1230 output: vec![],
1231 };
1232
1233 let txid = tx.compute_txid();
1235 let rbfinfo = Some(RbfSigningInfo {
1236 vout: 123,
1237 tweak_merkle_root: Some(TapNodeHash::all_zeros()),
1238 annex: None,
1239 additional_taproot_output_count: None,
1240 });
1241 let id = db
1242 .save_tx(None, None, &tx, FeePayingType::CPFP, txid, rbfinfo.clone())
1243 .await
1244 .unwrap();
1245
1246 let (_, retrieved_tx, fee_paying_type, seen_block_id, rbf_signing_info) =
1248 db.get_try_to_send_tx(None, id).await.unwrap();
1249 assert_eq!(tx.version, retrieved_tx.version);
1250 assert_eq!(fee_paying_type, FeePayingType::CPFP);
1251 assert_eq!(seen_block_id, None);
1252 assert_eq!(rbf_signing_info, rbfinfo);
1253 }
1254
1255 #[tokio::test]
1256 async fn test_fee_payer_utxo_operations() {
1257 let db = setup_test_db().await;
1258 let mut dbtx = db.begin_transaction().await.unwrap();
1259
1260 let tx = Transaction {
1262 version: Version::TWO,
1263 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1264 input: vec![],
1265 output: vec![],
1266 };
1267
1268 let tx_id = db
1270 .save_tx(
1271 Some(&mut dbtx),
1272 None,
1273 &tx,
1274 FeePayingType::CPFP,
1275 Txid::all_zeros(),
1276 None,
1277 )
1278 .await
1279 .unwrap();
1280
1281 let fee_payer_txid = Txid::hash(&[1u8; 32]);
1283 db.save_fee_payer_tx(
1284 Some(&mut dbtx),
1285 tx_id,
1286 fee_payer_txid,
1287 0,
1288 Amount::from_sat(50000),
1289 None,
1290 )
1291 .await
1292 .unwrap();
1293
1294 dbtx.commit().await.unwrap();
1295 }
1296
1297 #[tokio::test]
1298 async fn test_confirm_and_unconfirm_transactions() {
1299 const BLOCK_HEX: &str = "0200000035ab154183570282ce9afc0b494c9fc6a3cfea05aa8c1add2ecc56490000000038ba3d78e4500a5a7570dbe61960398add4410d278b21cd9708e6d9743f374d544fc055227f1001c29c1ea3b0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff3703a08601000427f1001c046a510100522cfabe6d6d0000000000000000000068692066726f6d20706f6f6c7365727665726aac1eeeed88ffffffff0100f2052a010000001976a914912e2b234f941f30b18afbb4fa46171214bf66c888ac00000000";
1300 let block: Block = deserialize(&hex::decode(BLOCK_HEX).unwrap()).unwrap();
1301
1302 let db = setup_test_db().await;
1303 let mut dbtx = db.begin_transaction().await.unwrap();
1304
1305 let block_id = crate::bitcoin_syncer::save_block(&db, &mut dbtx, &block, 100)
1307 .await
1308 .unwrap();
1309
1310 let tx = Transaction {
1312 version: Version::TWO,
1313 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1314 input: vec![],
1315 output: vec![],
1316 };
1317 let tx_id = db
1318 .save_tx(
1319 Some(&mut dbtx),
1320 None,
1321 &tx,
1322 FeePayingType::CPFP,
1323 Txid::all_zeros(),
1324 None,
1325 )
1326 .await
1327 .unwrap();
1328
1329 let fee_payer_txid = Txid::hash(&[1u8; 32]);
1331 db.save_fee_payer_tx(
1332 Some(&mut dbtx),
1333 tx_id,
1334 fee_payer_txid,
1335 0,
1336 Amount::from_sat(50000),
1337 None,
1338 )
1339 .await
1340 .unwrap();
1341
1342 db.insert_txid_to_block(&mut dbtx, block_id, &fee_payer_txid)
1344 .await
1345 .unwrap();
1346
1347 db.confirm_transactions(&mut dbtx, block_id).await.unwrap();
1349
1350 dbtx.commit().await.unwrap();
1351 }
1352
1353 #[tokio::test]
1354 async fn test_cancelled_outpoints_and_txids() {
1355 let db = setup_test_db().await;
1356 let mut dbtx = db.begin_transaction().await.unwrap();
1357
1358 let tx = Transaction {
1360 version: Version::TWO,
1361 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1362 input: vec![],
1363 output: vec![],
1364 };
1365
1366 let tx_id = db
1368 .save_tx(
1369 Some(&mut dbtx),
1370 None,
1371 &tx,
1372 FeePayingType::CPFP,
1373 Txid::all_zeros(),
1374 None,
1375 )
1376 .await
1377 .unwrap();
1378
1379 let txid = Txid::hash(&[0u8; 32]);
1381 let vout = 0;
1382
1383 db.save_cancelled_outpoint(Some(&mut dbtx), tx_id, OutPoint { txid, vout })
1385 .await
1386 .unwrap();
1387
1388 db.save_cancelled_txid(Some(&mut dbtx), tx_id, txid)
1390 .await
1391 .unwrap();
1392
1393 dbtx.commit().await.unwrap();
1394 }
1395
1396 #[tokio::test]
1397 async fn test_get_sendable_txs() {
1398 let db = setup_test_db().await;
1399 let mut dbtx = db.begin_transaction().await.unwrap();
1400
1401 let tx1 = Transaction {
1403 version: Version::TWO,
1404 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1405 input: vec![],
1406 output: vec![],
1407 };
1408 let tx2 = Transaction {
1409 version: Version::TWO,
1410 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1411 input: vec![],
1412 output: vec![],
1413 };
1414
1415 let id1 = db
1416 .save_tx(
1417 Some(&mut dbtx),
1418 None,
1419 &tx1,
1420 FeePayingType::CPFP,
1421 Txid::all_zeros(),
1422 None,
1423 )
1424 .await
1425 .unwrap();
1426 let id2 = db
1427 .save_tx(
1428 Some(&mut dbtx),
1429 None,
1430 &tx2,
1431 FeePayingType::RBF,
1432 Txid::all_zeros(),
1433 None,
1434 )
1435 .await
1436 .unwrap();
1437
1438 let fee_rate = FeeRate::from_sat_per_vb(3).unwrap();
1440 let current_tip_height = 100;
1441
1442 let sendable_txs = db
1443 .get_sendable_txs(Some(&mut dbtx), fee_rate, current_tip_height)
1444 .await
1445 .unwrap();
1446
1447 assert_eq!(sendable_txs.len(), 2);
1449 assert!(sendable_txs.contains(&id1));
1450 assert!(sendable_txs.contains(&id2));
1451
1452 db.update_effective_fee_rate(Some(&mut dbtx), id1, fee_rate)
1455 .await
1456 .unwrap();
1457
1458 let sendable_txs = db
1459 .get_sendable_txs(Some(&mut dbtx), fee_rate, current_tip_height)
1460 .await
1461 .unwrap();
1462 assert_eq!(sendable_txs.len(), 1);
1463 assert!(sendable_txs.contains(&id2));
1464
1465 let sendable_txs = db
1467 .get_sendable_txs(
1468 Some(&mut dbtx),
1469 FeeRate::from_sat_per_vb(4).unwrap(),
1470 current_tip_height + 1,
1471 )
1472 .await
1473 .unwrap();
1474 assert_eq!(sendable_txs.len(), 2);
1475 assert!(sendable_txs.contains(&id1));
1476 assert!(sendable_txs.contains(&id2));
1477
1478 dbtx.commit().await.unwrap();
1479 }
1480
1481 #[tokio::test]
1482 async fn test_debug_sending_state() {
1483 let db = setup_test_db().await;
1484 let mut dbtx = db.begin_transaction().await.unwrap();
1485
1486 let tx = Transaction {
1488 version: Version::TWO,
1489 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
1490 input: vec![],
1491 output: vec![],
1492 };
1493
1494 let tx_id = db
1496 .save_tx(
1497 None, None,
1499 &tx,
1500 FeePayingType::RBF,
1501 tx.compute_txid(),
1502 None,
1503 )
1504 .await
1505 .unwrap();
1506
1507 let initial_state = "waiting_for_fee_payer_utxos";
1509 db.update_tx_debug_sending_state(tx_id, initial_state, false)
1510 .await
1511 .unwrap();
1512
1513 let state = db.get_tx_debug_info(Some(&mut dbtx), tx_id).await.unwrap();
1515 assert_eq!(state, Some(initial_state.to_string()));
1516
1517 let active_state = "ready_to_send";
1519 db.update_tx_debug_sending_state(tx_id, active_state, true)
1520 .await
1521 .unwrap();
1522
1523 let state = db.get_tx_debug_info(Some(&mut dbtx), tx_id).await.unwrap();
1525 assert_eq!(state, Some(active_state.to_string()));
1526
1527 let error_message = "Failed to send transaction: insufficient fee";
1529 db.save_tx_debug_submission_error(Some(&mut dbtx), tx_id, error_message)
1530 .await
1531 .unwrap();
1532
1533 let errors = db
1535 .get_tx_debug_submission_errors(Some(&mut dbtx), tx_id)
1536 .await
1537 .unwrap();
1538 assert_eq!(errors.len(), 1);
1539 assert_eq!(errors[0].0, error_message);
1540
1541 let second_error = "Network connection timeout";
1543 db.save_tx_debug_submission_error(Some(&mut dbtx), tx_id, second_error)
1544 .await
1545 .unwrap();
1546
1547 let errors = db
1549 .get_tx_debug_submission_errors(Some(&mut dbtx), tx_id)
1550 .await
1551 .unwrap();
1552 assert_eq!(errors.len(), 2);
1553 assert_eq!(errors[0].0, error_message);
1554 assert_eq!(errors[1].0, second_error);
1555
1556 let final_state = "sent";
1558 db.update_tx_debug_sending_state(tx_id, final_state, true)
1559 .await
1560 .unwrap();
1561
1562 let state = db.get_tx_debug_info(Some(&mut dbtx), tx_id).await.unwrap();
1564 assert_eq!(state, Some(final_state.to_string()));
1565
1566 dbtx.commit().await.unwrap();
1567 }
1568}