1use super::wrapper::TxidDB;
4use super::{TxSenderDb, TxSenderDbTx};
5use crate::txsender_execute_query_with_tx;
6use bitcoin::consensus::{deserialize, serialize};
7use bitcoin::{Amount, OutPoint, Transaction, Txid};
8use clementine_errors::BridgeError;
9use clementine_primitives::FeeRateKvb;
10use clementine_utils::{FeePayingType, RbfSigningInfo, TxMetadata};
11use eyre::{Context, OptionExt};
12use sqlx::Executor;
13use std::collections::HashMap;
14
15use crate::{ActivatedWithOutpoint, ActivatedWithTxid};
16
17impl TxSenderDb {
18 #[allow(clippy::too_many_arguments)]
27 pub async fn save_fee_payer_tx(
28 &self,
29 tx: Option<TxSenderDbTx<'_>>,
30 bumped_id: u32,
31 fee_payer_txid: Txid,
32 vout: u32,
33 amount: Amount,
34 replacement_of_id: Option<u32>,
35 ) -> Result<(), BridgeError> {
36 let query = sqlx::query(
37 "INSERT INTO tx_sender_fee_payer_utxos (bumped_id, fee_payer_txid, vout, amount, replacement_of_id)
38 VALUES ($1, $2, $3, $4, $5)",
39 )
40 .bind(i32::try_from(bumped_id).wrap_err("Failed to convert bumped id to i32")?)
41 .bind(TxidDB(fee_payer_txid))
42 .bind(i32::try_from(vout).wrap_err("Failed to convert vout to i32")?)
43 .bind(i64::try_from(amount.to_sat()).wrap_err("Failed to convert amount to i64")?)
44 .bind(
45 replacement_of_id
46 .map(i32::try_from)
47 .transpose()
48 .wrap_err("Failed to convert replacement of id to i32")?,
49 );
50
51 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
52 Ok(())
53 }
54
55 pub async fn get_all_unconfirmed_fee_payer_txs(
70 &self,
71 tx: Option<TxSenderDbTx<'_>>,
72 ) -> Result<Vec<(u32, u32, Txid, u32, Amount, Option<u32>)>, BridgeError> {
73 let query = sqlx::query_as::<_, (i32, i32, TxidDB, i32, i64, Option<i32>)>(
74 "
75 SELECT fpu.id, fpu.bumped_id, fpu.fee_payer_txid, fpu.vout, fpu.amount, fpu.replacement_of_id
76 FROM tx_sender_fee_payer_utxos fpu
77 WHERE fpu.seen_at_height IS NULL
78 AND fpu.is_evicted = false
79 AND NOT EXISTS (
80 SELECT 1
81 FROM tx_sender_fee_payer_utxos x
82 WHERE COALESCE(x.replacement_of_id, x.id)
83 = COALESCE(fpu.replacement_of_id, fpu.id)
84 AND x.seen_at_height IS NOT NULL
85 )
86 ",
87 );
88
89 let results: Vec<(i32, i32, TxidDB, i32, i64, Option<i32>)> =
90 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
91
92 results
93 .iter()
94 .map(
95 |(id, bumped_id, fee_payer_txid, vout, amount, replacement_of_id)| {
96 Ok((
97 u32::try_from(*id).wrap_err("Failed to convert id to u32")?,
98 u32::try_from(*bumped_id).wrap_err("Failed to convert bumped id to u32")?,
99 fee_payer_txid.0,
100 u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
101 Amount::from_sat(
102 u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
103 ),
104 replacement_of_id
105 .map(u32::try_from)
106 .transpose()
107 .wrap_err("Failed to convert replacement of id to u32")?,
108 ))
109 },
110 )
111 .collect::<Result<Vec<_>, BridgeError>>()
112 }
113
114 pub async fn get_unconfirmed_fee_payer_txs(
130 &self,
131 tx: Option<TxSenderDbTx<'_>>,
132 bumped_id: u32,
133 ) -> Result<Vec<(u32, Txid, u32, Amount)>, BridgeError> {
134 let query = sqlx::query_as::<_, (i32, TxidDB, i32, i64)>(
135 "
136 SELECT fpu.id, fpu.fee_payer_txid, fpu.vout, fpu.amount
137 FROM tx_sender_fee_payer_utxos fpu
138 WHERE fpu.bumped_id = $1
139 AND fpu.seen_at_height IS NULL
140 AND fpu.is_evicted = false
141 AND NOT EXISTS (
142 SELECT 1
143 FROM tx_sender_fee_payer_utxos x
144 WHERE COALESCE(x.replacement_of_id, x.id)
145 = COALESCE(fpu.replacement_of_id, fpu.id)
146 AND x.seen_at_height IS NOT NULL
147 )
148 ",
149 )
150 .bind(i32::try_from(bumped_id).wrap_err("Failed to convert bumped id to i32")?);
151
152 let results: Vec<(i32, TxidDB, i32, i64)> =
153 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
154
155 results
156 .iter()
157 .map(|(id, fee_payer_txid, vout, amount)| {
158 Ok((
159 u32::try_from(*id).wrap_err("Failed to convert id to u32")?,
160 fee_payer_txid.0,
161 u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
162 Amount::from_sat(
163 u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
164 ),
165 ))
166 })
167 .collect::<Result<Vec<_>, BridgeError>>()
168 }
169
170 pub async fn mark_fee_payer_utxo_as_evicted(
175 &self,
176 tx: Option<TxSenderDbTx<'_>>,
177 id: u32,
178 ) -> Result<(), BridgeError> {
179 let query = sqlx::query(
180 "UPDATE tx_sender_fee_payer_utxos
181 SET is_evicted = true
182 WHERE id = $1
183 OR replacement_of_id = $1",
184 )
185 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
186
187 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
188 Ok(())
189 }
190
191 pub async fn get_confirmed_fee_payer_utxos(
192 &self,
193 tx: Option<TxSenderDbTx<'_>>,
194 id: u32,
195 ) -> Result<Vec<(Txid, u32, Amount)>, BridgeError> {
196 let query = sqlx::query_as::<_, (TxidDB, i32, i64)>(
197 "SELECT fee_payer_txid, vout, amount
198 FROM tx_sender_fee_payer_utxos fpu
199 WHERE fpu.bumped_id = $1 AND fpu.seen_at_height IS NOT NULL",
200 )
201 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
202
203 let results: Vec<(TxidDB, i32, i64)> =
204 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
205
206 results
207 .iter()
208 .map(|(fee_payer_txid, vout, amount)| {
209 Ok((
210 fee_payer_txid.0,
211 u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
212 Amount::from_sat(
213 u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
214 ),
215 ))
216 })
217 .collect::<Result<Vec<_>, BridgeError>>()
218 }
219
220 pub async fn check_if_tx_exists_on_txsender(
224 &self,
225 tx: Option<TxSenderDbTx<'_>>,
226 txid: Txid,
227 ) -> Result<Option<u32>, BridgeError> {
228 let query = sqlx::query_as::<_, (i32,)>(
229 "SELECT id FROM tx_sender_try_to_send_txs WHERE txid = $1 LIMIT 1",
230 )
231 .bind(TxidDB(txid));
232
233 let result: Option<(i32,)> =
234 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_optional)?;
235
236 Ok(match result {
237 Some((id,)) => Some(u32::try_from(id).wrap_err("Failed to convert id to u32")?),
238 None => None,
239 })
240 }
241
242 pub async fn save_tx(
243 &self,
244 tx: TxSenderDbTx<'_>,
245 tx_metadata: Option<TxMetadata>,
246 raw_tx: &Transaction,
247 fee_paying_type: FeePayingType,
248 txid: Txid,
249 rbf_signing_info: Option<RbfSigningInfo>,
250 ) -> Result<u32, BridgeError> {
251 let query = sqlx::query_scalar(
252 r#"
253 INSERT INTO tx_sender_try_to_send_txs
254 (raw_tx, fee_paying_type, tx_metadata, txid, rbf_signing_info)
255 VALUES ($1, $2::fee_paying_type, $3, $4, $5)
256 ON CONFLICT (txid)
257 DO UPDATE SET txid = EXCLUDED.txid
258 RETURNING id
259 "#,
260 )
261 .bind(serialize(raw_tx))
262 .bind(fee_paying_type)
263 .bind(serde_json::to_string(&tx_metadata).wrap_err("Failed to encode tx_metadata to JSON")?)
264 .bind(TxidDB(txid))
265 .bind(
266 serde_json::to_string(&rbf_signing_info)
267 .wrap_err("Failed to encode rbf_signing_info to JSON")?,
268 );
269
270 let id: i32 = query.fetch_one(&mut **tx).await?;
271 u32::try_from(id)
272 .wrap_err("Failed to convert id to u32")
273 .map_err(Into::into)
274 }
275
276 pub async fn save_rbf_txid(
277 &self,
278 tx: Option<TxSenderDbTx<'_>>,
279 id: u32,
280 txid: Txid,
281 ) -> Result<(), BridgeError> {
282 let query = sqlx::query("INSERT INTO tx_sender_rbf_txids (id, txid) VALUES ($1, $2)")
283 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?)
284 .bind(TxidDB(txid));
285
286 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
287 Ok(())
288 }
289
290 pub async fn get_last_rbf_txid(
291 &self,
292 tx: Option<TxSenderDbTx<'_>>,
293 id: u32,
294 ) -> Result<Option<Txid>, BridgeError> {
295 let query = sqlx::query_as::<_, (TxidDB,)>(
296 "SELECT txid FROM tx_sender_rbf_txids WHERE id = $1 ORDER BY insertion_order DESC LIMIT 1",
297 )
298 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
299
300 let result: Option<(TxidDB,)> =
301 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_optional)?;
302 Ok(result.map(|(txid,)| txid.0))
303 }
304
305 pub async fn list_rbf_txids_for_id(
306 &self,
307 tx: Option<TxSenderDbTx<'_>>,
308 id: u32,
309 ) -> Result<Vec<Txid>, BridgeError> {
310 let query = sqlx::query_as::<_, (TxidDB,)>(
311 "SELECT txid FROM tx_sender_rbf_txids WHERE id = $1 ORDER BY insertion_order DESC",
312 )
313 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
314
315 let results: Vec<(TxidDB,)> =
316 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
317 Ok(results.into_iter().map(|(txid,)| txid.0).collect())
318 }
319
320 pub async fn save_cancelled_outpoint(
321 &self,
322 tx: TxSenderDbTx<'_>,
323 cancelled_id: u32,
324 outpoint: OutPoint,
325 ) -> Result<(), BridgeError> {
326 let query = sqlx::query(
327 "INSERT INTO tx_sender_cancel_try_to_send_outpoints (cancelled_id, txid, vout) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
328 )
329 .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled id to i32")?)
330 .bind(TxidDB(outpoint.txid))
331 .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?);
332
333 query.execute(&mut **tx).await?;
334 Ok(())
335 }
336
337 pub async fn save_cancelled_txid(
338 &self,
339 tx: TxSenderDbTx<'_>,
340 cancelled_id: u32,
341 txid: Txid,
342 ) -> Result<(), BridgeError> {
343 let query = sqlx::query(
344 "INSERT INTO tx_sender_cancel_try_to_send_txids (cancelled_id, txid) VALUES ($1, $2) ON CONFLICT DO NOTHING",
345 )
346 .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled id to i32")?)
347 .bind(TxidDB(txid));
348
349 query.execute(&mut **tx).await?;
350 Ok(())
351 }
352
353 pub async fn save_activated_txid(
354 &self,
355 tx: TxSenderDbTx<'_>,
356 activated_id: u32,
357 prerequisite_tx: &ActivatedWithTxid,
358 ) -> Result<(), BridgeError> {
359 let query = sqlx::query(
360 "INSERT INTO tx_sender_activate_try_to_send_txids (activated_id, txid, timelock) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
361 )
362 .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated id to i32")?)
363 .bind(TxidDB(prerequisite_tx.txid))
364 .bind(i32::try_from(prerequisite_tx.relative_block_height).wrap_err("Failed to convert relative block height to i32")?);
365
366 query.execute(&mut **tx).await?;
367 Ok(())
368 }
369
370 pub async fn save_activated_outpoint(
371 &self,
372 tx: TxSenderDbTx<'_>,
373 activated_id: u32,
374 activated_outpoint: &ActivatedWithOutpoint,
375 ) -> Result<(), BridgeError> {
376 let query = sqlx::query(
377 "INSERT INTO tx_sender_activate_try_to_send_outpoints (activated_id, txid, vout, timelock) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING",
378 )
379 .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated id to i32")?)
380 .bind(TxidDB(activated_outpoint.outpoint.txid))
381 .bind(i32::try_from(activated_outpoint.outpoint.vout).wrap_err("Failed to convert vout to i32")?)
382 .bind(i32::try_from(activated_outpoint.relative_block_height).wrap_err("Failed to convert relative block height to i32")?);
383
384 query.execute(&mut **tx).await?;
385 Ok(())
386 }
387
388 pub async fn get_sendable_txs(
404 &self,
405 tx: Option<TxSenderDbTx<'_>>,
406 fee_rate: FeeRateKvb,
407 current_tip_height: u32,
408 ) -> Result<Vec<u32>, BridgeError> {
409 let select_query = sqlx::query_as::<_, (i32,)>(
410 "WITH
411 non_active_txs AS (
412 SELECT DISTINCT
413 activate_txid.activated_id AS tx_id
414 FROM
415 tx_sender_activate_try_to_send_txids AS activate_txid
416 WHERE
417 (
418 activate_txid.timelock > 0
419 AND (
420 activate_txid.seen_at_height IS NULL
421 OR (activate_txid.seen_at_height::bigint + activate_txid.timelock > $2::bigint)
422 )
423 )
424 OR (
425 activate_txid.timelock = 0
426 AND activate_txid.seen_at_height IS NULL
427 AND activate_txid.in_mempool IS NOT TRUE
428 )
429
430 UNION
431
432 SELECT DISTINCT
433 activate_outpoint.activated_id AS tx_id
434 FROM
435 tx_sender_activate_try_to_send_outpoints AS activate_outpoint
436 WHERE
437 activate_outpoint.seen_at_height IS NULL
438 OR (activate_outpoint.seen_at_height::bigint + activate_outpoint.timelock > $2::bigint)
439 ),
440
441 cancelled_txs AS (
442 SELECT DISTINCT
443 cancelled_id AS tx_id
444 FROM
445 tx_sender_cancel_try_to_send_outpoints
446 WHERE
447 seen_at_height IS NOT NULL
448
449 UNION
450
451 SELECT DISTINCT
452 cancelled_id AS tx_id
453 FROM
454 tx_sender_cancel_try_to_send_txids
455 WHERE
456 seen_at_height IS NOT NULL
457 )
458
459 SELECT
460 txs.id
461 FROM
462 tx_sender_try_to_send_txs AS txs
463 WHERE
464 txs.id NOT IN (SELECT tx_id FROM non_active_txs)
465 AND txs.id NOT IN (SELECT tx_id FROM cancelled_txs)
466 AND txs.seen_at_height IS NULL
467 AND (txs.effective_fee_rate IS NULL OR txs.effective_fee_rate < $1);",
468 )
469 .bind(
470 i64::try_from(fee_rate.to_sat_per_kvb()).wrap_err("Failed to convert fee rate to i64")?,
471 )
472 .bind(i32::try_from(current_tip_height).wrap_err("Failed to convert current tip height to i32")?);
473
474 let results = txsender_execute_query_with_tx!(&self.pool, tx, select_query, fetch_all)?;
475
476 let txs = results
477 .into_iter()
478 .map(|(id,)| u32::try_from(id))
479 .collect::<Result<Vec<_>, _>>()
480 .wrap_err("Failed to convert id to u32")?;
481
482 Ok(txs)
483 }
484
485 pub async fn get_effective_fee_rate(
489 &self,
490 tx: Option<TxSenderDbTx<'_>>,
491 id: u32,
492 ) -> Result<(Option<FeeRateKvb>, Option<u32>), BridgeError> {
493 let query = sqlx::query_as::<_, (Option<i64>, Option<i32>)>(
494 "SELECT effective_fee_rate, last_bump_block_height FROM tx_sender_try_to_send_txs WHERE id = $1",
495 )
496 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
497
498 let result = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_optional)?;
499
500 match result {
501 Some((Some(rate), block_height)) => Ok((
502 Some(FeeRateKvb::from_sat_per_kvb(
503 u64::try_from(rate).wrap_err("Failed to convert effective fee rate to u64")?,
504 )),
505 block_height.map(|h| h as u32),
506 )),
507 Some((None, _)) | None => Ok((None, None)),
508 }
509 }
510
511 pub async fn update_effective_fee_rate(
519 &self,
520 tx: Option<TxSenderDbTx<'_>>,
521 id: u32,
522 effective_fee_rate: FeeRateKvb,
523 block_height: u32,
524 ) -> Result<(), BridgeError> {
525 let query = sqlx::query(
526 "UPDATE tx_sender_try_to_send_txs
527 SET effective_fee_rate = $1, last_bump_block_height = $2
528 WHERE id = $3 AND (effective_fee_rate IS NULL OR effective_fee_rate != $1)",
529 )
530 .bind(
531 i64::try_from(effective_fee_rate.to_sat_per_kvb())
532 .wrap_err("Failed to convert effective fee rate to i64")?,
533 )
534 .bind(i32::try_from(block_height).wrap_err("Failed to convert block_height to i32")?)
535 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
536
537 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
538 Ok(())
539 }
540
541 pub async fn get_try_to_send_tx(
542 &self,
543 tx: Option<TxSenderDbTx<'_>>,
544 id: u32,
545 ) -> Result<
546 (
547 Option<TxMetadata>,
548 Transaction,
549 FeePayingType,
550 Option<u32>,
551 Option<RbfSigningInfo>,
552 ),
553 BridgeError,
554 > {
555 let query = sqlx::query_as::<
556 _,
557 (
558 Option<String>,
559 Option<Vec<u8>>,
560 FeePayingType,
561 Option<i32>,
562 Option<String>,
563 ),
564 >(
565 "SELECT tx_metadata, raw_tx, fee_paying_type, seen_at_height, rbf_signing_info
566 FROM tx_sender_try_to_send_txs
567 WHERE id = $1 LIMIT 1",
568 )
569 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?);
570
571 let result = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_one)?;
572 Ok((
573 serde_json::from_str(result.0.as_deref().unwrap_or("null"))
574 .wrap_err_with(|| format!("Failed to decode tx_metadata from {:?}", result.0))?,
575 result
576 .1
577 .as_deref()
578 .map(deserialize)
579 .ok_or_eyre("Expected raw_tx to be present")?
580 .wrap_err("Failed to deserialize raw_tx")?,
581 result.2,
582 result
583 .3
584 .map(u32::try_from)
585 .transpose()
586 .wrap_err("Failed to convert seen_at_height to u32")?,
587 serde_json::from_str(result.4.as_deref().unwrap_or("null")).wrap_err_with(|| {
588 format!("Failed to decode rbf_signing_info from {:?}", result.4)
589 })?,
590 ))
591 }
592
593 pub async fn save_tx_debug_submission_error(
595 &self,
596 tx: Option<TxSenderDbTx<'_>>,
597 tx_id: u32,
598 error_message: &str,
599 ) -> Result<(), BridgeError> {
600 let query = sqlx::query(
601 "INSERT INTO tx_sender_debug_submission_errors (tx_id, error_message) VALUES ($1, $2)",
602 )
603 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?)
604 .bind(error_message);
605
606 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
607 Ok(())
608 }
609
610 pub async fn update_tx_debug_sending_state(
615 &self,
616 tx_id: u32,
617 state: &str,
618 activated: bool,
619 ) -> Result<(), BridgeError> {
620 let query = sqlx::query(
621 r#"
622 INSERT INTO tx_sender_debug_sending_state
623 (tx_id, state, last_update, activated_timestamp)
624 VALUES ($1, $2, NOW(),
625 CASE
626 WHEN $3 = TRUE THEN NOW()
627 ELSE NULL
628 END
629 )
630 ON CONFLICT (tx_id) DO UPDATE SET
631 state = $2,
632 last_update = NOW(),
633 activated_timestamp = COALESCE(tx_sender_debug_sending_state.activated_timestamp,
634 CASE
635 WHEN $3 = TRUE THEN NOW()
636 ELSE NULL
637 END
638 )
639 "#,
640 )
641 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?)
642 .bind(state)
643 .bind(activated);
644
645 self.pool.execute(query).await?;
646 Ok(())
647 }
648
649 pub async fn get_tx_debug_info(
651 &self,
652 tx: Option<TxSenderDbTx<'_>>,
653 tx_id: u32,
654 ) -> Result<Option<String>, BridgeError> {
655 let query = sqlx::query_as::<_, (Option<String>,)>(
656 r#"
657 SELECT state
658 FROM tx_sender_debug_sending_state
659 WHERE tx_id = $1
660 "#,
661 )
662 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
663
664 let result = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_optional)?;
665 match result {
666 Some((state,)) => Ok(state),
667 None => Ok(None),
668 }
669 }
670
671 pub async fn get_tx_debug_submission_errors(
673 &self,
674 tx: Option<TxSenderDbTx<'_>>,
675 tx_id: u32,
676 ) -> Result<Vec<(String, String)>, BridgeError> {
677 let query = sqlx::query_as::<_, (String, String)>(
678 r#"
679 SELECT error_message, timestamp::TEXT
680 FROM tx_sender_debug_submission_errors
681 WHERE tx_id = $1
682 ORDER BY timestamp ASC
683 "#,
684 )
685 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
686
687 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all).map_err(Into::into)
688 }
689
690 pub async fn get_tx_debug_fee_payer_utxos(
692 &self,
693 tx: Option<TxSenderDbTx<'_>>,
694 tx_id: u32,
695 ) -> Result<Vec<(Txid, u32, Amount, bool)>, BridgeError> {
696 let query = sqlx::query_as::<_, (TxidDB, i32, i64, bool)>(
697 r#"
698 SELECT fee_payer_txid, vout, amount, seen_at_height IS NOT NULL as confirmed
699 FROM tx_sender_fee_payer_utxos
700 WHERE bumped_id = $1
701 "#,
702 )
703 .bind(i32::try_from(tx_id).wrap_err("Failed to convert tx_id to i32")?);
704
705 let results: Vec<(TxidDB, i32, i64, bool)> =
706 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
707
708 results
709 .iter()
710 .map(|(fee_payer_txid, vout, amount, confirmed)| {
711 Ok((
712 fee_payer_txid.0,
713 u32::try_from(*vout).wrap_err("Failed to convert vout to u32")?,
714 Amount::from_sat(
715 u64::try_from(*amount).wrap_err("Failed to convert amount to u64")?,
716 ),
717 *confirmed,
718 ))
719 })
720 .collect::<Result<Vec<_>, BridgeError>>()
721 }
722
723 pub async fn debug_inactive_txs(&self, fee_rate: FeeRateKvb, current_tip_height: u32) {
725 tracing::info!("TXSENDER_DBG_INACTIVE_TXS: Checking inactive transactions");
726
727 let unconfirmed_txs = match sqlx::query_as::<_, (i32, TxidDB, Option<String>)>(
728 "SELECT id, txid, tx_metadata FROM tx_sender_try_to_send_txs WHERE seen_at_height IS NULL",
729 )
730 .fetch_all(&self.pool)
731 .await
732 {
733 Ok(txs) => txs,
734 Err(e) => {
735 tracing::error!(
736 "TXSENDER_DBG_INACTIVE_TXS: Failed to query unconfirmed txs: {}",
737 e
738 );
739 return;
740 }
741 };
742
743 let sendable_txs = match self
744 .get_sendable_txs(None, fee_rate, current_tip_height)
745 .await
746 {
747 Ok(txs) => txs,
748 Err(e) => {
749 tracing::error!(
750 "TXSENDER_DBG_INACTIVE_TXS: Failed to get sendable txs: {}",
751 e
752 );
753 return;
754 }
755 };
756
757 for (tx_id, txid, tx_metadata) in unconfirmed_txs {
758 let tx_metadata: Option<TxMetadata> =
759 serde_json::from_str(tx_metadata.as_deref().unwrap_or("null")).ok();
760
761 let id = match u32::try_from(tx_id) {
762 Ok(id) => id,
763 Err(e) => {
764 tracing::error!("TXSENDER_DBG_INACTIVE_TXS: Failed to convert id: {}", e);
765 continue;
766 }
767 };
768
769 if sendable_txs.contains(&id) {
770 tracing::info!(
771 "TXSENDER_DBG_INACTIVE_TXS: TX {} (txid: {}) is ACTIVE",
772 id,
773 txid.0
774 );
775 continue;
776 }
777
778 tracing::info!(
779 "TXSENDER_DBG_INACTIVE_TXS: TX {} (txid: {}, type: {:?}) is inactive, reasons:",
780 id,
781 txid.0,
782 tx_metadata.as_ref().map(|metadata| metadata.tx_type)
783 );
784
785 let txid_activations = match sqlx::query_as::<_, (Option<i32>, i64, TxidDB)>(
787 "SELECT seen_at_height, timelock, txid
788 FROM tx_sender_activate_try_to_send_txids
789 WHERE activated_id = $1",
790 )
791 .bind(tx_id)
792 .fetch_all(&self.pool)
793 .await
794 {
795 Ok(activations) => activations,
796 Err(e) => {
797 tracing::error!(
798 "TXSENDER_DBG_INACTIVE_TXS: Failed to query txid activations: {}",
799 e
800 );
801 continue;
802 }
803 };
804
805 for (seen_at_height, timelock, txid) in txid_activations {
806 if seen_at_height.is_none() {
807 tracing::info!("TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because its txid activation {} has not been seen", id, txid.0);
808 continue;
809 }
810
811 let seen_at_height = seen_at_height.expect("checked above");
812 if (seen_at_height as i64) + timelock > current_tip_height as i64 {
813 tracing::info!(
814 "TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because its txid activation timelock hasn't expired (seen_at_height: {}, timelock: {}, current_tip_height: {})",
815 id, seen_at_height, timelock, current_tip_height
816 );
817 }
818 }
819
820 let outpoint_activations = match sqlx::query_as::<_, (Option<i32>, i64, TxidDB, i32)>(
822 "SELECT seen_at_height, timelock, txid, vout
823 FROM tx_sender_activate_try_to_send_outpoints
824 WHERE activated_id = $1",
825 )
826 .bind(tx_id)
827 .fetch_all(&self.pool)
828 .await
829 {
830 Ok(activations) => activations,
831 Err(e) => {
832 tracing::error!(
833 "TXSENDER_DBG_INACTIVE_TXS: Failed to query outpoint activations: {}",
834 e
835 );
836 continue;
837 }
838 };
839
840 for (seen_at_height, timelock, txid, vout) in outpoint_activations {
841 if seen_at_height.is_none() {
842 tracing::info!("TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because its outpoint activation {}:{} has not been seen spent", id, txid.0, vout);
843 continue;
844 }
845
846 let seen_at_height = seen_at_height.expect("checked above");
847 if (seen_at_height as i64) + timelock > current_tip_height as i64 {
848 tracing::info!(
849 "TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because its outpoint activation timelock hasn't expired (seen_at_height: {}, timelock: {}, current_tip_height: {})",
850 id, seen_at_height, timelock, current_tip_height
851 );
852 }
853 }
854
855 let cancelled_txids = match sqlx::query_as::<_, (TxidDB, Option<i32>)>(
857 "SELECT txid, seen_at_height
858 FROM tx_sender_cancel_try_to_send_txids
859 WHERE cancelled_id = $1",
860 )
861 .bind(tx_id)
862 .fetch_all(&self.pool)
863 .await
864 {
865 Ok(x) => x,
866 Err(e) => {
867 tracing::error!(
868 "TXSENDER_DBG_INACTIVE_TXS: Failed to query cancelled txids: {}",
869 e
870 );
871 continue;
872 }
873 };
874
875 for (txid, seen_at_height) in cancelled_txids {
876 if seen_at_height.is_some() {
877 tracing::info!(
878 "TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because it was cancelled by txid {}",
879 id,
880 txid.0
881 );
882 }
883 }
884
885 let cancelled_outpoints = match sqlx::query_as::<_, (TxidDB, i32, Option<i32>)>(
886 "SELECT txid, vout, seen_at_height
887 FROM tx_sender_cancel_try_to_send_outpoints
888 WHERE cancelled_id = $1",
889 )
890 .bind(tx_id)
891 .fetch_all(&self.pool)
892 .await
893 {
894 Ok(x) => x,
895 Err(e) => {
896 tracing::error!(
897 "TXSENDER_DBG_INACTIVE_TXS: Failed to query cancelled outpoints: {}",
898 e
899 );
900 continue;
901 }
902 };
903
904 for (txid, vout, seen_at_height) in cancelled_outpoints {
905 if seen_at_height.is_some() {
906 tracing::info!(
907 "TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because it was cancelled by outpoint {}:{}",
908 id,
909 txid.0,
910 vout
911 );
912 }
913 }
914
915 let effective_fee_rate = match sqlx::query_scalar::<_, Option<i64>>(
916 "SELECT effective_fee_rate FROM tx_sender_try_to_send_txs WHERE id = $1",
917 )
918 .bind(tx_id)
919 .fetch_one(&self.pool)
920 .await
921 {
922 Ok(rate) => rate,
923 Err(e) => {
924 tracing::error!(
925 "TXSENDER_DBG_INACTIVE_TXS: Failed to query effective fee rate: {}",
926 e
927 );
928 continue;
929 }
930 };
931
932 if let Some(rate) = effective_fee_rate {
933 if rate >= fee_rate.to_sat_per_kvb() as i64 {
934 tracing::info!(
935 "TXSENDER_DBG_INACTIVE_TXS: TX {} is inactive because its effective fee rate ({} sat/kvB) is >= the current fee rate ({} sat/kvB)",
936 id,
937 rate,
938 fee_rate.to_sat_per_kvb()
939 );
940 }
941 }
942 }
943 }
944
945 pub async fn list_unfinalized_try_to_send_txs(
953 &self,
954 tx: Option<TxSenderDbTx<'_>>,
955 ) -> Result<Vec<(u32, FeePayingType, Txid, Option<u32>)>, BridgeError> {
956 let query = sqlx::query_as::<_, (i32, FeePayingType, TxidDB, Option<i32>)>(
957 r#"
958 SELECT id, fee_paying_type, txid, seen_at_height
959 FROM tx_sender_try_to_send_txs
960 WHERE is_finalized = FALSE
961 AND NOT EXISTS (
962 SELECT 1
963 FROM tx_sender_cancel_try_to_send_txids
964 WHERE cancelled_id = tx_sender_try_to_send_txs.id
965 AND is_finalized = TRUE
966 )
967 AND NOT EXISTS (
968 SELECT 1
969 FROM tx_sender_cancel_try_to_send_outpoints
970 WHERE cancelled_id = tx_sender_try_to_send_txs.id
971 AND is_finalized = TRUE
972 )
973 "#,
974 );
975
976 let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
977 results
978 .into_iter()
979 .map(|(id, fee_paying_type, txid, seen_at_height)| {
980 Ok((
981 u32::try_from(id).wrap_err("Failed to convert id to u32")?,
982 fee_paying_type,
983 txid.0,
984 seen_at_height
985 .map(u32::try_from)
986 .transpose()
987 .wrap_err("Failed to convert seen_at_height to u32")?,
988 ))
989 })
990 .collect::<Result<Vec<_>, BridgeError>>()
991 }
992
993 pub async fn list_rbf_txids_for_ids(
994 &self,
995 tx: Option<TxSenderDbTx<'_>>,
996 ids: &[u32],
997 ) -> Result<Vec<(u32, Txid)>, BridgeError> {
998 if ids.is_empty() {
999 return Ok(vec![]);
1000 }
1001
1002 let ids_i32: Vec<i32> = ids
1003 .iter()
1004 .copied()
1005 .map(i32::try_from)
1006 .collect::<Result<Vec<_>, _>>()
1007 .wrap_err("Failed to convert ids to i32")?;
1008
1009 let query = sqlx::query_as::<_, (i32, TxidDB)>(
1010 "SELECT id, txid FROM tx_sender_rbf_txids WHERE id = ANY($1) ORDER BY insertion_order DESC",
1011 )
1012 .bind(ids_i32);
1013
1014 let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1015 results
1016 .into_iter()
1017 .map(|(id, txid)| {
1018 Ok((
1019 u32::try_from(id).wrap_err("Failed to convert id to u32")?,
1020 txid.0,
1021 ))
1022 })
1023 .collect::<Result<Vec<_>, BridgeError>>()
1024 }
1025
1026 pub async fn set_try_to_send_seen_at_height(
1027 &self,
1028 tx: Option<TxSenderDbTx<'_>>,
1029 id: u32,
1030 seen_at_height: Option<u32>,
1031 ) -> Result<(), BridgeError> {
1032 let query =
1033 sqlx::query("UPDATE tx_sender_try_to_send_txs SET seen_at_height = $2 WHERE id = $1")
1034 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?)
1035 .bind(
1036 seen_at_height
1037 .map(i32::try_from)
1038 .transpose()
1039 .wrap_err("Failed to convert seen_at_height to i32")?,
1040 );
1041
1042 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1043 Ok(())
1044 }
1045
1046 pub async fn list_try_to_send_statuses_by_ids(
1048 &self,
1049 tx: Option<TxSenderDbTx<'_>>,
1050 ids: &[u32],
1051 ) -> Result<HashMap<u32, (Option<u32>, bool)>, BridgeError> {
1052 if ids.is_empty() {
1053 return Ok(HashMap::new());
1054 }
1055
1056 let ids_i32: Vec<i32> = ids
1057 .iter()
1058 .copied()
1059 .map(i32::try_from)
1060 .collect::<Result<Vec<_>, _>>()
1061 .wrap_err("Failed to convert ids to i32")?;
1062
1063 let query = sqlx::query_as::<_, (i32, Option<i32>, bool)>(
1064 "SELECT id, seen_at_height, is_finalized
1065 FROM tx_sender_try_to_send_txs
1066 WHERE id = ANY($1)",
1067 )
1068 .bind(ids_i32);
1069
1070 let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1071 let mut map = HashMap::with_capacity(results.len());
1072 for (id, seen_at_height, is_finalized) in results {
1073 let id = u32::try_from(id).wrap_err("Failed to convert id to u32")?;
1074 let seen_at_height = seen_at_height
1075 .map(u32::try_from)
1076 .transpose()
1077 .wrap_err("Failed to convert seen_at_height to u32")?;
1078 map.insert(id, (seen_at_height, is_finalized));
1079 }
1080
1081 Ok(map)
1082 }
1083
1084 pub async fn set_try_to_send_finalized(
1085 &self,
1086 tx: Option<TxSenderDbTx<'_>>,
1087 id: u32,
1088 is_finalized: bool,
1089 ) -> Result<(), BridgeError> {
1090 let query =
1091 sqlx::query("UPDATE tx_sender_try_to_send_txs SET is_finalized = $2 WHERE id = $1")
1092 .bind(i32::try_from(id).wrap_err("Failed to convert id to i32")?)
1093 .bind(is_finalized);
1094
1095 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1096 Ok(())
1097 }
1098
1099 pub async fn list_unfinalized_fee_payer_utxos(
1111 &self,
1112 tx: Option<TxSenderDbTx<'_>>,
1113 ) -> Result<Vec<(u32, Txid, Option<u32>)>, BridgeError> {
1114 let query = sqlx::query_as::<_, (i32, TxidDB, Option<i32>)>(
1115 r#"
1116 SELECT id, fee_payer_txid, seen_at_height
1117 FROM tx_sender_fee_payer_utxos
1118 WHERE is_finalized = FALSE
1119 AND NOT EXISTS (
1120 SELECT 1
1121 FROM tx_sender_fee_payer_utxos other
1122 WHERE COALESCE(other.replacement_of_id, other.id)
1123 = COALESCE(tx_sender_fee_payer_utxos.replacement_of_id, tx_sender_fee_payer_utxos.id)
1124 AND other.is_finalized = TRUE
1125 )
1126 "#,
1127 );
1128
1129 let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1130 results
1131 .into_iter()
1132 .map(|(id, txid, seen_at_height)| {
1133 Ok((
1134 u32::try_from(id).wrap_err("Failed to convert id to u32")?,
1135 txid.0,
1136 seen_at_height
1137 .map(u32::try_from)
1138 .transpose()
1139 .wrap_err("Failed to convert seen_at_height to u32")?,
1140 ))
1141 })
1142 .collect::<Result<Vec<_>, BridgeError>>()
1143 }
1144
1145 pub async fn set_fee_payer_seen_at_height(
1146 &self,
1147 tx: Option<TxSenderDbTx<'_>>,
1148 fee_payer_utxo_id: u32,
1149 seen_at_height: Option<u32>,
1150 ) -> Result<(), BridgeError> {
1151 let query =
1152 sqlx::query("UPDATE tx_sender_fee_payer_utxos SET seen_at_height = $2 WHERE id = $1")
1153 .bind(i32::try_from(fee_payer_utxo_id).wrap_err("Failed to convert id to i32")?)
1154 .bind(
1155 seen_at_height
1156 .map(i32::try_from)
1157 .transpose()
1158 .wrap_err("Failed to convert seen_at_height to i32")?,
1159 );
1160
1161 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1162 Ok(())
1163 }
1164
1165 pub async fn set_fee_payer_finalized(
1166 &self,
1167 tx: Option<TxSenderDbTx<'_>>,
1168 fee_payer_utxo_id: u32,
1169 is_finalized: bool,
1170 ) -> Result<(), BridgeError> {
1171 let query =
1172 sqlx::query("UPDATE tx_sender_fee_payer_utxos SET is_finalized = $2 WHERE id = $1")
1173 .bind(i32::try_from(fee_payer_utxo_id).wrap_err("Failed to convert id to i32")?)
1174 .bind(is_finalized);
1175
1176 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1177 Ok(())
1178 }
1179
1180 pub async fn list_unfinalized_cancel_txids(
1181 &self,
1182 tx: Option<TxSenderDbTx<'_>>,
1183 ) -> Result<Vec<(u32, Txid, Option<u32>)>, BridgeError> {
1184 let query = sqlx::query_as::<_, (i32, TxidDB, Option<i32>)>(
1185 r#"
1186 SELECT cancelled_id, txid, seen_at_height
1187 FROM tx_sender_cancel_try_to_send_txids
1188 WHERE is_finalized = FALSE
1189 "#,
1190 );
1191
1192 let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1193 results
1194 .into_iter()
1195 .map(|(cancelled_id, txid, seen_at_height)| {
1196 Ok((
1197 u32::try_from(cancelled_id)
1198 .wrap_err("Failed to convert cancelled_id to u32")?,
1199 txid.0,
1200 seen_at_height
1201 .map(u32::try_from)
1202 .transpose()
1203 .wrap_err("Failed to convert seen_at_height to u32")?,
1204 ))
1205 })
1206 .collect::<Result<Vec<_>, BridgeError>>()
1207 }
1208
1209 pub async fn set_cancel_txid_seen_at_height(
1210 &self,
1211 tx: Option<TxSenderDbTx<'_>>,
1212 cancelled_id: u32,
1213 txid: Txid,
1214 seen_at_height: Option<u32>,
1215 ) -> Result<(), BridgeError> {
1216 let query = sqlx::query(
1217 "UPDATE tx_sender_cancel_try_to_send_txids SET seen_at_height = $3 WHERE cancelled_id = $1 AND txid = $2",
1218 )
1219 .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled_id to i32")?)
1220 .bind(TxidDB(txid))
1221 .bind(
1222 seen_at_height
1223 .map(i32::try_from)
1224 .transpose()
1225 .wrap_err("Failed to convert seen_at_height to i32")?,
1226 );
1227
1228 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1229 Ok(())
1230 }
1231
1232 pub async fn set_cancel_txid_finalized(
1233 &self,
1234 tx: Option<TxSenderDbTx<'_>>,
1235 cancelled_id: u32,
1236 txid: Txid,
1237 is_finalized: bool,
1238 ) -> Result<(), BridgeError> {
1239 let query = sqlx::query(
1240 "UPDATE tx_sender_cancel_try_to_send_txids SET is_finalized = $3 WHERE cancelled_id = $1 AND txid = $2",
1241 )
1242 .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled_id to i32")?)
1243 .bind(TxidDB(txid))
1244 .bind(is_finalized);
1245
1246 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1247 Ok(())
1248 }
1249
1250 pub async fn list_unfinalized_activate_txids(
1251 &self,
1252 tx: Option<TxSenderDbTx<'_>>,
1253 ) -> Result<Vec<(u32, Txid, Option<u32>, bool)>, BridgeError> {
1254 let query = sqlx::query_as::<_, (i32, TxidDB, Option<i32>, bool)>(
1255 r#"
1256 SELECT activated_id, txid, seen_at_height, in_mempool
1257 FROM tx_sender_activate_try_to_send_txids
1258 WHERE is_finalized = FALSE
1259 "#,
1260 );
1261
1262 let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1263 results
1264 .into_iter()
1265 .map(|(activated_id, txid, seen_at_height, in_mempool)| {
1266 Ok((
1267 u32::try_from(activated_id)
1268 .wrap_err("Failed to convert activated_id to u32")?,
1269 txid.0,
1270 seen_at_height
1271 .map(u32::try_from)
1272 .transpose()
1273 .wrap_err("Failed to convert seen_at_height to u32")?,
1274 in_mempool,
1275 ))
1276 })
1277 .collect::<Result<Vec<_>, BridgeError>>()
1278 }
1279
1280 pub async fn set_activate_txid_seen_at_height(
1281 &self,
1282 tx: Option<TxSenderDbTx<'_>>,
1283 activated_id: u32,
1284 txid: Txid,
1285 seen_at_height: Option<u32>,
1286 ) -> Result<(), BridgeError> {
1287 let query = sqlx::query(
1288 "UPDATE tx_sender_activate_try_to_send_txids SET seen_at_height = $3 WHERE activated_id = $1 AND txid = $2",
1289 )
1290 .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated_id to i32")?)
1291 .bind(TxidDB(txid))
1292 .bind(
1293 seen_at_height
1294 .map(i32::try_from)
1295 .transpose()
1296 .wrap_err("Failed to convert seen_at_height to i32")?,
1297 );
1298
1299 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1300 Ok(())
1301 }
1302
1303 pub async fn set_activate_txid_finalized(
1304 &self,
1305 tx: Option<TxSenderDbTx<'_>>,
1306 activated_id: u32,
1307 txid: Txid,
1308 is_finalized: bool,
1309 ) -> Result<(), BridgeError> {
1310 let query = sqlx::query(
1311 "UPDATE tx_sender_activate_try_to_send_txids SET is_finalized = $3 WHERE activated_id = $1 AND txid = $2",
1312 )
1313 .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated_id to i32")?)
1314 .bind(TxidDB(txid))
1315 .bind(is_finalized);
1316
1317 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1318 Ok(())
1319 }
1320
1321 pub async fn set_activate_txid_mempool_status(
1322 &self,
1323 tx: Option<TxSenderDbTx<'_>>,
1324 activated_id: u32,
1325 txid: Txid,
1326 in_mempool: bool,
1327 ) -> Result<(), BridgeError> {
1328 let query = sqlx::query(
1329 "UPDATE tx_sender_activate_try_to_send_txids SET in_mempool = $3 WHERE activated_id = $1 AND txid = $2",
1330 )
1331 .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated_id to i32")?)
1332 .bind(TxidDB(txid))
1333 .bind(in_mempool);
1334
1335 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1336 Ok(())
1337 }
1338
1339 pub async fn get_activate_txid_status(
1340 &self,
1341 tx: Option<TxSenderDbTx<'_>>,
1342 txid: Txid,
1343 ) -> Result<Option<(bool, Option<u32>)>, BridgeError> {
1344 let query = sqlx::query_as::<_, (Option<bool>, Option<i32>)>(
1345 "SELECT bool_or(in_mempool), max(seen_at_height)
1346 FROM tx_sender_activate_try_to_send_txids
1347 WHERE txid = $1",
1348 )
1349 .bind(TxidDB(txid));
1350
1351 let (any_in_mempool, seen_at_height): (Option<bool>, Option<i32>) =
1352 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_one)?;
1353
1354 if any_in_mempool.is_none() && seen_at_height.is_none() {
1355 return Ok(None);
1356 }
1357
1358 let any_in_mempool = any_in_mempool.unwrap_or(false);
1359 let seen_at_height = seen_at_height
1360 .map(u32::try_from)
1361 .transpose()
1362 .wrap_err("Failed to convert seen_at_height to u32")?;
1363
1364 Ok(Some((any_in_mempool, seen_at_height)))
1365 }
1366
1367 pub async fn delete_try_to_send_tx(
1368 &self,
1369 mut tx: Option<TxSenderDbTx<'_>>,
1370 id: u32,
1371 ) -> Result<(), BridgeError> {
1372 let id_i32 = i32::try_from(id).wrap_err("Failed to convert id to i32")?;
1373
1374 let queries = [
1375 "DELETE FROM tx_sender_debug_sending_state WHERE tx_id = $1",
1376 "DELETE FROM tx_sender_debug_submission_errors WHERE tx_id = $1",
1377 "DELETE FROM tx_sender_rbf_txids WHERE id = $1",
1378 "DELETE FROM tx_sender_fee_payer_utxos WHERE bumped_id = $1",
1379 "DELETE FROM tx_sender_cancel_try_to_send_outpoints WHERE cancelled_id = $1",
1380 "DELETE FROM tx_sender_cancel_try_to_send_txids WHERE cancelled_id = $1",
1381 "DELETE FROM tx_sender_activate_try_to_send_outpoints WHERE activated_id = $1",
1382 "DELETE FROM tx_sender_activate_try_to_send_txids WHERE activated_id = $1",
1383 "DELETE FROM tx_sender_try_to_send_txs WHERE id = $1",
1384 ];
1385
1386 for sql in queries {
1387 let query = sqlx::query(sql).bind(id_i32);
1388 txsender_execute_query_with_tx!(&self.pool, tx.as_deref_mut(), query, execute)?;
1389 }
1390
1391 Ok(())
1392 }
1393
1394 pub async fn list_unfinalized_cancel_outpoints(
1395 &self,
1396 tx: Option<TxSenderDbTx<'_>>,
1397 ) -> Result<Vec<(u32, OutPoint, Option<u32>)>, BridgeError> {
1398 let query = sqlx::query_as::<_, (i32, TxidDB, i32, Option<i32>)>(
1399 r#"
1400 SELECT cancelled_id, txid, vout, seen_at_height
1401 FROM tx_sender_cancel_try_to_send_outpoints
1402 WHERE is_finalized = FALSE
1403 "#,
1404 );
1405
1406 let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1407 results
1408 .into_iter()
1409 .map(|(cancelled_id, txid, vout, seen_at_height)| {
1410 Ok((
1411 u32::try_from(cancelled_id)
1412 .wrap_err("Failed to convert cancelled_id to u32")?,
1413 OutPoint {
1414 txid: txid.0,
1415 vout: u32::try_from(vout).wrap_err("Failed to convert vout to u32")?,
1416 },
1417 seen_at_height
1418 .map(u32::try_from)
1419 .transpose()
1420 .wrap_err("Failed to convert seen_at_height to u32")?,
1421 ))
1422 })
1423 .collect::<Result<Vec<_>, BridgeError>>()
1424 }
1425
1426 pub async fn set_cancel_outpoint_seen_at_height(
1427 &self,
1428 tx: Option<TxSenderDbTx<'_>>,
1429 cancelled_id: u32,
1430 outpoint: OutPoint,
1431 seen_at_height: Option<u32>,
1432 ) -> Result<(), BridgeError> {
1433 let query = sqlx::query(
1434 "UPDATE tx_sender_cancel_try_to_send_outpoints SET seen_at_height = $4 WHERE cancelled_id = $1 AND txid = $2 AND vout = $3",
1435 )
1436 .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled_id to i32")?)
1437 .bind(TxidDB(outpoint.txid))
1438 .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?)
1439 .bind(
1440 seen_at_height
1441 .map(i32::try_from)
1442 .transpose()
1443 .wrap_err("Failed to convert seen_at_height to i32")?,
1444 );
1445
1446 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1447 Ok(())
1448 }
1449
1450 pub async fn set_cancel_outpoint_finalized(
1451 &self,
1452 tx: Option<TxSenderDbTx<'_>>,
1453 cancelled_id: u32,
1454 outpoint: OutPoint,
1455 is_finalized: bool,
1456 ) -> Result<(), BridgeError> {
1457 let query = sqlx::query(
1458 "UPDATE tx_sender_cancel_try_to_send_outpoints SET is_finalized = $4 WHERE cancelled_id = $1 AND txid = $2 AND vout = $3",
1459 )
1460 .bind(i32::try_from(cancelled_id).wrap_err("Failed to convert cancelled_id to i32")?)
1461 .bind(TxidDB(outpoint.txid))
1462 .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?)
1463 .bind(is_finalized);
1464
1465 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1466 Ok(())
1467 }
1468
1469 pub async fn list_unfinalized_activate_outpoints(
1470 &self,
1471 tx: Option<TxSenderDbTx<'_>>,
1472 ) -> Result<Vec<(u32, OutPoint, Option<u32>)>, BridgeError> {
1473 let query = sqlx::query_as::<_, (i32, TxidDB, i32, Option<i32>)>(
1474 r#"
1475 SELECT activated_id, txid, vout, seen_at_height
1476 FROM tx_sender_activate_try_to_send_outpoints
1477 WHERE is_finalized = FALSE
1478 "#,
1479 );
1480
1481 let results = txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
1482 results
1483 .into_iter()
1484 .map(|(activated_id, txid, vout, seen_at_height)| {
1485 Ok((
1486 u32::try_from(activated_id)
1487 .wrap_err("Failed to convert activated_id to u32")?,
1488 OutPoint {
1489 txid: txid.0,
1490 vout: u32::try_from(vout).wrap_err("Failed to convert vout to u32")?,
1491 },
1492 seen_at_height
1493 .map(u32::try_from)
1494 .transpose()
1495 .wrap_err("Failed to convert seen_at_height to u32")?,
1496 ))
1497 })
1498 .collect::<Result<Vec<_>, BridgeError>>()
1499 }
1500
1501 pub async fn set_activate_outpoint_seen_at_height(
1502 &self,
1503 tx: Option<TxSenderDbTx<'_>>,
1504 activated_id: u32,
1505 outpoint: OutPoint,
1506 seen_at_height: Option<u32>,
1507 ) -> Result<(), BridgeError> {
1508 let query = sqlx::query(
1509 "UPDATE tx_sender_activate_try_to_send_outpoints SET seen_at_height = $4 WHERE activated_id = $1 AND txid = $2 AND vout = $3",
1510 )
1511 .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated_id to i32")?)
1512 .bind(TxidDB(outpoint.txid))
1513 .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?)
1514 .bind(
1515 seen_at_height
1516 .map(i32::try_from)
1517 .transpose()
1518 .wrap_err("Failed to convert seen_at_height to i32")?,
1519 );
1520
1521 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1522 Ok(())
1523 }
1524
1525 pub async fn set_activate_outpoint_finalized(
1526 &self,
1527 tx: Option<TxSenderDbTx<'_>>,
1528 activated_id: u32,
1529 outpoint: OutPoint,
1530 is_finalized: bool,
1531 ) -> Result<(), BridgeError> {
1532 let query = sqlx::query(
1533 "UPDATE tx_sender_activate_try_to_send_outpoints SET is_finalized = $4 WHERE activated_id = $1 AND txid = $2 AND vout = $3",
1534 )
1535 .bind(i32::try_from(activated_id).wrap_err("Failed to convert activated_id to i32")?)
1536 .bind(TxidDB(outpoint.txid))
1537 .bind(i32::try_from(outpoint.vout).wrap_err("Failed to convert vout to i32")?)
1538 .bind(is_finalized);
1539
1540 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
1541 Ok(())
1542 }
1543
1544 pub async fn update_synced_height(&self, height: u32) -> Result<(), BridgeError> {
1545 sqlx::query(
1546 "INSERT INTO tx_sender_sync_state (id, synced_height, updated_at)
1547 VALUES (1, $1, NOW())
1548 ON CONFLICT (id) DO UPDATE SET synced_height = EXCLUDED.synced_height, updated_at = NOW()",
1549 )
1550 .bind(i32::try_from(height).wrap_err("Failed to convert height to i32")?)
1551 .execute(&self.pool)
1552 .await
1553 .map_err(BridgeError::DatabaseError)?;
1554 Ok(())
1555 }
1556
1557 pub async fn get_synced_height(&self) -> Result<Option<u32>, BridgeError> {
1558 let result: Option<i32> =
1559 sqlx::query_scalar("SELECT synced_height FROM tx_sender_sync_state WHERE id = 1")
1560 .fetch_optional(&self.pool)
1561 .await
1562 .map_err(BridgeError::DatabaseError)?;
1563
1564 Ok(result
1565 .map(|h| u32::try_from(h).wrap_err("Failed to convert height from DB"))
1566 .transpose()?)
1567 }
1568}
1569
1570#[cfg(all(test, feature = "testing"))]
1571mod tests {
1572 use super::*;
1573 use crate::test_utils::create_test_environment;
1574 use bitcoin::hashes::Hash as _;
1575 use bitcoin::transaction::Version;
1576 use bitcoin::{absolute, Transaction, Txid};
1577
1578 fn txid(byte: u8) -> Txid {
1579 Txid::from_byte_array([byte; 32])
1580 }
1581
1582 fn empty_tx() -> Transaction {
1583 Transaction {
1584 version: Version::TWO,
1585 lock_time: absolute::LockTime::ZERO,
1586 input: vec![],
1587 output: vec![],
1588 }
1589 }
1590
1591 async fn save_fee_payer_chain(db: &TxSenderDb, txid_prefix: u8) -> (u32, u32, u32) {
1592 let mut dbtx = db.begin_transaction().await.unwrap();
1593 let bumped_id = db
1594 .save_tx(
1595 &mut dbtx,
1596 None,
1597 &empty_tx(),
1598 FeePayingType::CPFP,
1599 txid(txid_prefix),
1600 None,
1601 )
1602 .await
1603 .unwrap();
1604 db.commit_transaction(dbtx).await.unwrap();
1605
1606 let root_txid = txid(txid_prefix + 1);
1607 db.save_fee_payer_tx(
1608 None,
1609 bumped_id,
1610 root_txid,
1611 0,
1612 Amount::from_sat(10_000),
1613 None,
1614 )
1615 .await
1616 .unwrap();
1617
1618 let initial: Vec<(u32, u32, Txid, u32, Amount, Option<u32>)> =
1619 db.get_all_unconfirmed_fee_payer_txs(None).await.unwrap();
1620 let root_id = initial
1621 .iter()
1622 .find_map(|(id, chain_bumped_id, txid, _, _, _)| {
1623 (*chain_bumped_id == bumped_id && *txid == root_txid).then_some(*id)
1624 })
1625 .unwrap();
1626
1627 let replacement_txid = txid(txid_prefix + 2);
1628 db.save_fee_payer_tx(
1629 None,
1630 bumped_id,
1631 replacement_txid,
1632 0,
1633 Amount::from_sat(10_000),
1634 Some(root_id),
1635 )
1636 .await
1637 .unwrap();
1638
1639 let replacement_id: i32 = sqlx::query_scalar(
1640 "SELECT id FROM tx_sender_fee_payer_utxos WHERE fee_payer_txid = $1",
1641 )
1642 .bind(TxidDB(replacement_txid))
1643 .fetch_one(db.pool())
1644 .await
1645 .unwrap();
1646
1647 let unconfirmed = db
1648 .get_unconfirmed_fee_payer_txs(None, bumped_id)
1649 .await
1650 .unwrap();
1651 assert_eq!(unconfirmed.len(), 2);
1652
1653 (bumped_id, root_id, replacement_id as u32)
1654 }
1655
1656 async fn assert_no_unconfirmed_fee_payers(db: &TxSenderDb, bumped_id: u32) {
1657 assert!(db
1658 .get_unconfirmed_fee_payer_txs(None, bumped_id)
1659 .await
1660 .unwrap()
1661 .is_empty());
1662 }
1663
1664 #[tokio::test]
1665 async fn confirmed_fee_payer_chain_has_no_unconfirmed_txs() {
1666 let db = create_test_environment(true, false).await.1.unwrap();
1667
1668 let (root_confirmed_bumped_id, root_id, _) = save_fee_payer_chain(&db, 10).await;
1669 db.set_fee_payer_seen_at_height(None, root_id, Some(100))
1670 .await
1671 .unwrap();
1672 assert_no_unconfirmed_fee_payers(&db, root_confirmed_bumped_id).await;
1673
1674 let (replacement_confirmed_bumped_id, _, replacement_id) =
1675 save_fee_payer_chain(&db, 20).await;
1676 db.set_fee_payer_seen_at_height(None, replacement_id, Some(101))
1677 .await
1678 .unwrap();
1679 assert_no_unconfirmed_fee_payers(&db, replacement_confirmed_bumped_id).await;
1680
1681 assert!(db
1682 .get_all_unconfirmed_fee_payer_txs(None)
1683 .await
1684 .unwrap()
1685 .is_empty());
1686 }
1687}