1use crate::{
2 rpc_errors::is_mempool_not_found_error, rpc_errors::is_not_found_error, FeePayingType,
3 TxSender, TxSenderTransaction,
4};
5use bitcoin::{BlockHash, Network, OutPoint, Txid};
6use bitcoincore_rpc::RpcApi;
7use clementine_errors::BridgeError;
8use clementine_extended_rpc::{BitcoinRPCError, RetryConfig};
9use serde::Deserialize;
10use std::collections::HashMap;
11use tokio::time::{timeout, Duration};
12use tokio_retry::RetryIf;
13
14#[derive(Copy, Clone, Debug)]
15enum TxChainStatus {
16 Confirmed {
18 block_height: u32,
19 confirmations: u32,
20 },
21 InMempool,
23 NotPresent,
25}
26
27#[derive(Clone, Debug, Deserialize)]
28struct MempoolOutspendStatus {
29 confirmed: bool,
30 block_height: Option<u32>,
31 block_hash: Option<BlockHash>,
32}
33
34#[derive(Clone, Debug, Deserialize)]
35struct MempoolOutspendResponse {
36 spent: bool,
37 txid: Option<Txid>,
38 vin: Option<u32>,
39 status: Option<MempoolOutspendStatus>,
40}
41
42#[derive(Clone, Debug)]
43struct ValidatedOutspend {
44 confirmed: bool,
45 confirmations: u32,
46 block_height: Option<u32>,
47}
48
49impl TxSender {
50 pub async fn sync_transaction_confirmations_via_rpc(
61 &self,
62 mut dbtx: Option<&mut TxSenderTransaction>,
63 tip_height: u32,
64 ) -> Result<(), BridgeError> {
65 let finality = self.finality_depth;
66
67 let mut tx_status_cache: HashMap<Txid, TxChainStatus> = HashMap::new();
70 let mut block_info_cache: HashMap<BlockHash, (u32, u32)> = HashMap::new(); let unfinalized = self
74 .db
75 .list_unfinalized_try_to_send_txs(dbtx.as_deref_mut())
76 .await?;
77
78 let rbf_ids: Vec<u32> = unfinalized
79 .iter()
80 .filter_map(|(id, fee_paying_type, _txid, _seen_at_height)| {
81 matches!(
82 fee_paying_type,
83 FeePayingType::RBF | FeePayingType::RbfWtxidGrind
84 )
85 .then_some(*id)
86 })
87 .collect();
88
89 let mut rbf_txids_by_id: HashMap<u32, Vec<Txid>> = HashMap::new();
90 if !rbf_ids.is_empty() {
91 for (id, txid) in self
92 .db
93 .list_rbf_txids_for_ids(dbtx.as_deref_mut(), &rbf_ids)
94 .await?
95 {
96 rbf_txids_by_id.entry(id).or_default().push(txid);
97 }
98 }
99
100 for (id, fee_paying_type, txid, seen_at_height) in unfinalized {
101 let status = match fee_paying_type {
102 FeePayingType::CPFP | FeePayingType::NoFunding => {
103 get_tx_status_cached(
104 &self.rpc,
105 &mut tx_status_cache,
106 &mut block_info_cache,
107 txid,
108 )
109 .await?
110 }
111 FeePayingType::RBF | FeePayingType::RbfWtxidGrind => {
112 let Some(rbf_txids) = rbf_txids_by_id.get(&id) else {
113 continue;
115 };
116 let mut first_confirmed_rbf: Option<(u32, u32)> = None; for rbf_txid in rbf_txids {
118 if let TxChainStatus::Confirmed {
119 block_height,
120 confirmations,
121 } = get_tx_status_cached(
122 &self.rpc,
123 &mut tx_status_cache,
124 &mut block_info_cache,
125 *rbf_txid,
126 )
127 .await?
128 {
129 first_confirmed_rbf = Some((confirmations, block_height));
130 break;
131 }
132 }
133 match first_confirmed_rbf {
134 Some((confirmations, block_height)) => TxChainStatus::Confirmed {
135 block_height,
136 confirmations,
137 },
138 None => TxChainStatus::NotPresent,
139 }
140 }
141 };
142
143 match (seen_at_height, status) {
144 (Some(_), TxChainStatus::InMempool | TxChainStatus::NotPresent) => {
145 self.db
147 .set_try_to_send_seen_at_height(dbtx.as_deref_mut(), id, None)
148 .await?;
149 }
150 (
151 _,
152 TxChainStatus::Confirmed {
153 block_height,
154 confirmations,
155 },
156 ) => {
157 if seen_at_height != Some(block_height) {
158 self.db
159 .set_try_to_send_seen_at_height(
160 dbtx.as_deref_mut(),
161 id,
162 Some(block_height),
163 )
164 .await?;
165 }
166 if confirmations >= finality {
168 self.db
169 .set_try_to_send_finalized(dbtx.as_deref_mut(), id, true)
170 .await?;
171 }
172 }
173 _ => {}
174 }
175 }
176
177 for (fee_payer_utxo_id, fee_payer_txid, seen_at_height) in self
179 .db
180 .list_unfinalized_fee_payer_utxos(dbtx.as_deref_mut())
181 .await?
182 {
183 let status = get_tx_status_cached(
184 &self.rpc,
185 &mut tx_status_cache,
186 &mut block_info_cache,
187 fee_payer_txid,
188 )
189 .await?;
190
191 match (seen_at_height, status) {
192 (Some(_), TxChainStatus::InMempool | TxChainStatus::NotPresent) => {
193 self.db
194 .set_fee_payer_seen_at_height(dbtx.as_deref_mut(), fee_payer_utxo_id, None)
195 .await?;
196 }
197 (
198 _,
199 TxChainStatus::Confirmed {
200 block_height,
201 confirmations,
202 },
203 ) => {
204 if seen_at_height != Some(block_height) {
205 self.db
206 .set_fee_payer_seen_at_height(
207 dbtx.as_deref_mut(),
208 fee_payer_utxo_id,
209 Some(block_height),
210 )
211 .await?;
212 }
213 if confirmations >= finality {
215 self.db
216 .set_fee_payer_finalized(dbtx.as_deref_mut(), fee_payer_utxo_id, true)
217 .await?;
218 }
219 }
220 _ => {}
221 }
222 }
223
224 for (cancelled_id, txid, seen_at_height) in self
226 .db
227 .list_unfinalized_cancel_txids(dbtx.as_deref_mut())
228 .await?
229 {
230 let status =
231 get_tx_status_cached(&self.rpc, &mut tx_status_cache, &mut block_info_cache, txid)
232 .await?;
233
234 match (seen_at_height, status) {
235 (Some(_), TxChainStatus::InMempool | TxChainStatus::NotPresent) => {
236 self.db
237 .set_cancel_txid_seen_at_height(
238 dbtx.as_deref_mut(),
239 cancelled_id,
240 txid,
241 None,
242 )
243 .await?;
244 }
245 (
246 _,
247 TxChainStatus::Confirmed {
248 block_height,
249 confirmations,
250 },
251 ) => {
252 if seen_at_height != Some(block_height) {
253 self.db
254 .set_cancel_txid_seen_at_height(
255 dbtx.as_deref_mut(),
256 cancelled_id,
257 txid,
258 Some(block_height),
259 )
260 .await?;
261 }
262 if confirmations >= finality {
264 self.db
265 .set_cancel_txid_finalized(
266 dbtx.as_deref_mut(),
267 cancelled_id,
268 txid,
269 true,
270 )
271 .await?;
272 }
273 }
274 _ => {}
275 }
276 }
277
278 for (activated_id, txid, seen_at_height, in_mempool) in self
279 .db
280 .list_unfinalized_activate_txids(dbtx.as_deref_mut())
281 .await?
282 {
283 let status =
284 get_tx_status_cached(&self.rpc, &mut tx_status_cache, &mut block_info_cache, txid)
285 .await?;
286
287 match (seen_at_height, status) {
288 (Some(_), TxChainStatus::InMempool | TxChainStatus::NotPresent) => {
290 self.db
291 .set_activate_txid_seen_at_height(
292 dbtx.as_deref_mut(),
293 activated_id,
294 txid,
295 None,
296 )
297 .await?;
298 }
299 (None, TxChainStatus::InMempool) => {
301 if !in_mempool {
302 self.db
303 .set_activate_txid_mempool_status(
304 dbtx.as_deref_mut(),
305 activated_id,
306 txid,
307 true,
308 )
309 .await?;
310 }
311 }
312 (None, TxChainStatus::NotPresent) => {
314 if in_mempool {
315 self.db
316 .set_activate_txid_mempool_status(
317 dbtx.as_deref_mut(),
318 activated_id,
319 txid,
320 false,
321 )
322 .await?;
323 }
324 }
325 (
327 _,
328 TxChainStatus::Confirmed {
329 block_height,
330 confirmations,
331 },
332 ) => {
333 if in_mempool {
334 self.db
335 .set_activate_txid_mempool_status(
336 dbtx.as_deref_mut(),
337 activated_id,
338 txid,
339 false,
340 )
341 .await?;
342 }
343
344 if seen_at_height != Some(block_height) {
345 self.db
346 .set_activate_txid_seen_at_height(
347 dbtx.as_deref_mut(),
348 activated_id,
349 txid,
350 Some(block_height),
351 )
352 .await?;
353 }
354
355 if confirmations >= finality {
356 self.db
357 .set_activate_txid_finalized(
358 dbtx.as_deref_mut(),
359 activated_id,
360 txid,
361 true,
362 )
363 .await?;
364 }
365 }
366 }
367 }
368
369 self.sync_outpoint_observations_via_rpc(dbtx, tip_height)
374 .await
375 }
376
377 async fn sync_outpoint_observations_via_rpc(
382 &self,
383 mut dbtx: Option<&mut TxSenderTransaction>,
384 start_tip_height: u32,
385 ) -> Result<(), BridgeError> {
386 let finality = self.finality_depth;
387
388 let end_tip_height = self
390 .rpc
391 .get_current_chain_height()
392 .await
393 .map_err(|e| BridgeError::Eyre(eyre::eyre!(e)))?;
394 let observed_tip_height = std::cmp::max(start_tip_height, end_tip_height);
395
396 async fn check_spent(
397 rpc: &clementine_extended_rpc::ExtendedBitcoinRpc,
398 outpoint: &OutPoint,
399 ) -> Result<Option<bool>, BitcoinRPCError> {
400 match rpc.is_utxo_spent(outpoint).await {
401 Ok(spent) => Ok(Some(spent)),
402 Err(BitcoinRPCError::TransactionNotConfirmed) => Ok(None),
403 Err(e) => Err(e),
404 }
405 }
406
407 for (cancelled_id, outpoint, seen_at_height) in self
408 .db
409 .list_unfinalized_cancel_outpoints(dbtx.as_deref_mut())
410 .await?
411 {
412 match check_spent(&self.rpc, &outpoint).await {
413 Ok(Some(true)) => {
414 if let Some(validated) = self.get_mempool_outspend(&outpoint).await {
415 if validated.confirmed {
416 let seen_height = validated.block_height.unwrap_or(observed_tip_height);
417 if seen_at_height != Some(seen_height) {
418 self.db
419 .set_cancel_outpoint_seen_at_height(
420 dbtx.as_deref_mut(),
421 cancelled_id,
422 outpoint,
423 Some(seen_height),
424 )
425 .await?;
426 }
427 if validated.confirmations >= finality {
428 self.db
429 .set_cancel_outpoint_finalized(
430 dbtx.as_deref_mut(),
431 cancelled_id,
432 outpoint,
433 true,
434 )
435 .await?;
436 }
437 } else if seen_at_height.is_some() {
438 self.db
439 .set_cancel_outpoint_seen_at_height(
440 dbtx.as_deref_mut(),
441 cancelled_id,
442 outpoint,
443 None,
444 )
445 .await?;
446 }
447 } else if seen_at_height.is_none() {
448 self.db
449 .set_cancel_outpoint_seen_at_height(
450 dbtx.as_deref_mut(),
451 cancelled_id,
452 outpoint,
453 Some(observed_tip_height),
454 )
455 .await?;
456 } else {
457 let blocks_since_seen =
460 observed_tip_height.saturating_sub(seen_at_height.unwrap()) + 1;
461 if blocks_since_seen >= finality {
462 self.db
463 .set_cancel_outpoint_finalized(
464 dbtx.as_deref_mut(),
465 cancelled_id,
466 outpoint,
467 true,
468 )
469 .await?;
470 }
471 }
472 }
473 Ok(Some(false)) | Ok(None) => {
474 if seen_at_height.is_some() {
476 self.db
477 .set_cancel_outpoint_seen_at_height(
478 dbtx.as_deref_mut(),
479 cancelled_id,
480 outpoint,
481 None,
482 )
483 .await?;
484 }
485 }
486 Err(e) => {
487 tracing::warn!(%outpoint, "Failed to check outpoint spent status: {e}");
488 }
489 }
490 }
491
492 for (activated_id, outpoint, seen_at_height) in self
493 .db
494 .list_unfinalized_activate_outpoints(dbtx.as_deref_mut())
495 .await?
496 {
497 match check_spent(&self.rpc, &outpoint).await {
498 Ok(Some(true)) => {
499 if let Some(validated) = self.get_mempool_outspend(&outpoint).await {
500 if validated.confirmed {
501 let seen_height = validated.block_height.unwrap_or(observed_tip_height);
502 if seen_at_height != Some(seen_height) {
503 self.db
504 .set_activate_outpoint_seen_at_height(
505 dbtx.as_deref_mut(),
506 activated_id,
507 outpoint,
508 Some(seen_height),
509 )
510 .await?;
511 }
512 if validated.confirmations >= finality {
513 self.db
514 .set_activate_outpoint_finalized(
515 dbtx.as_deref_mut(),
516 activated_id,
517 outpoint,
518 true,
519 )
520 .await?;
521 }
522 } else if seen_at_height.is_some() {
523 self.db
524 .set_activate_outpoint_seen_at_height(
525 dbtx.as_deref_mut(),
526 activated_id,
527 outpoint,
528 None,
529 )
530 .await?;
531 }
532 } else if seen_at_height.is_none() {
533 self.db
534 .set_activate_outpoint_seen_at_height(
535 dbtx.as_deref_mut(),
536 activated_id,
537 outpoint,
538 Some(observed_tip_height),
539 )
540 .await?;
541 } else {
542 let blocks_since_seen =
545 observed_tip_height.saturating_sub(seen_at_height.unwrap()) + 1;
546 if blocks_since_seen >= finality {
547 self.db
548 .set_activate_outpoint_finalized(
549 dbtx.as_deref_mut(),
550 activated_id,
551 outpoint,
552 true,
553 )
554 .await?;
555 }
556 }
557 }
558 Ok(Some(false)) | Ok(None) => {
559 if seen_at_height.is_some() {
560 self.db
561 .set_activate_outpoint_seen_at_height(
562 dbtx.as_deref_mut(),
563 activated_id,
564 outpoint,
565 None,
566 )
567 .await?;
568 }
569 }
570 Err(e) => {
571 tracing::warn!(%outpoint, "Failed to check outpoint spent status: {}", e);
572 }
573 }
574 }
575
576 Ok(())
577 }
578
579 async fn get_mempool_outspend(&self, outpoint: &OutPoint) -> Option<ValidatedOutspend> {
583 let host = self.mempool_config.host.as_ref()?;
584
585 let url = match mempool_outspend_url(host, self.network, outpoint) {
586 Ok(url) => url,
587 Err(e) => {
588 tracing::warn!(%outpoint, "Failed to build mempool outspend URL: {e}");
589 return None;
590 }
591 };
592
593 let response = match fetch_mempool_outspend_with_backoff(&self.http_client, &url).await {
594 Ok(response) => response,
595 Err(e) => {
596 tracing::warn!(%outpoint, "Mempool outspend request failed: {e}");
597 return None;
598 }
599 };
600
601 if !response.spent {
602 tracing::warn!(
603 %outpoint,
604 ?response,
605 "Mempool space outspend response says unspent after RPC detected spent"
606 );
607 return None;
608 }
609
610 let Some(txid) = response.txid else {
611 tracing::warn!(%outpoint, ?response, "Mempool outspend missing txid");
612 return None;
613 };
614
615 let tx_info = match self.rpc.get_raw_transaction_info(&txid, None).await {
616 Ok(info) => info,
617 Err(e) => {
618 tracing::warn!(
619 %outpoint,
620 %txid,
621 "Failed to fetch spending tx info from RPC: {e}"
622 );
623 return None;
624 }
625 };
626
627 let Some(vin_index) = response.vin else {
628 tracing::warn!(
629 %outpoint,
630 %txid,
631 "Mempool space outspend missing vin index"
632 );
633 return None;
634 };
635
636 let vin = tx_info.vin.get(vin_index as usize);
637
638 let Some(vin_response) = vin else {
639 tracing::warn!(
640 %outpoint,
641 %txid,
642 vin_index,
643 "Mempool space outspend vin does not match outpoint"
644 );
645 return None;
646 };
647 if !(vin_response.txid == Some(outpoint.txid) && vin_response.vout == Some(outpoint.vout)) {
648 tracing::warn!(
649 %outpoint,
650 %txid,
651 vin_index,
652 "Mempool space outspend vin does not match outpoint"
653 );
654 return None;
655 }
656
657 let confirmations = tx_info.confirmations.unwrap_or(0) as u32;
658 if confirmations > 0 {
659 let Some(rpc_block_hash) = tx_info.blockhash else {
660 tracing::warn!(
661 %outpoint,
662 %txid,
663 "Spending tx missing blockhash in RPC response"
664 );
665 return None;
666 };
667
668 let block_height = match response.status {
669 Some(status) if status.confirmed => {
670 let Some(block_hash) = status.block_hash else {
671 tracing::warn!(
672 %outpoint,
673 %txid,
674 "Mempool space outspend missing block_hash for confirmed spend"
675 );
676 return None;
677 };
678 let Some(block_height) = status.block_height else {
679 tracing::warn!(
680 %outpoint,
681 %txid,
682 "Mempool space outspend missing block_height for confirmed spend"
683 );
684 return None;
685 };
686
687 if rpc_block_hash != block_hash {
688 tracing::warn!(
689 %outpoint,
690 %txid,
691 ?block_hash,
692 ?rpc_block_hash,
693 "Mempool space outspend block hash does not match RPC"
694 );
695 return None;
696 }
697
698 block_height
699 }
700 _ => {
701 let block_info = match self.rpc.get_block_info(&rpc_block_hash).await {
702 Ok(info) => info,
703 Err(e) => {
704 tracing::warn!(
705 %outpoint,
706 %txid,
707 "Failed to fetch block info for spending tx: {e}"
708 );
709 return None;
710 }
711 };
712 block_info.height as u32
713 }
714 };
715
716 Some(ValidatedOutspend {
717 confirmed: true,
718 confirmations,
719 block_height: Some(block_height),
720 })
721 } else {
722 Some(ValidatedOutspend {
723 confirmed: false,
724 confirmations,
725 block_height: None,
726 })
727 }
728 }
729}
730
731fn mempool_outspend_url(
732 host: &str,
733 network: Network,
734 outpoint: &OutPoint,
735) -> Result<String, eyre::Report> {
736 let host = host.trim_end_matches('/');
737 let prefix = match network {
738 Network::Bitcoin => "",
739 Network::Testnet4 => "/testnet4",
740 Network::Signet => "", _ => {
742 return Err(eyre::eyre!(
743 "Unsupported network for mempool.space outspend: {network:?}"
744 ))
745 }
746 };
747
748 Ok(format!(
749 "{host}{prefix}/api/tx/{}/outspend/{}",
750 outpoint.txid, outpoint.vout
751 ))
752}
753
754async fn fetch_mempool_outspend_with_backoff(
755 client: &reqwest::Client,
756 url: &str,
757) -> Result<MempoolOutspendResponse, eyre::Report> {
758 fn always_retry(_: &eyre::Report) -> bool {
759 true
760 }
761
762 let retry_config = RetryConfig::new(250, Duration::from_secs(5), 4, 2, true);
763 let retry_strategy = retry_config.get_strategy();
764
765 RetryIf::spawn(
766 retry_strategy,
767 || {
768 let url = url.to_string();
769 let client = client.clone();
770 async move {
771 let resp = timeout(Duration::from_secs(20), client.get(&url).send())
772 .await
773 .map_err(|_| eyre::eyre!("Mempool outspend request timed out"))?
774 .map_err(|e| eyre::eyre!("Mempool outspend request failed: {e}"))?;
775
776 let status = resp.status();
777 if !status.is_success() {
778 return Err(eyre::eyre!("Mempool outspend HTTP status: {status}"));
779 }
780
781 let parsed = timeout(Duration::from_secs(5), resp.json())
782 .await
783 .map_err(|_| eyre::eyre!("Mempool outspend JSON timed out"))?
784 .map_err(|e| eyre::eyre!("Mempool outspend JSON error: {e}"))?;
785
786 Ok(parsed)
787 }
788 },
789 always_retry,
790 )
791 .await
792}
793
794async fn get_tx_status_cached(
797 rpc: &clementine_extended_rpc::ExtendedBitcoinRpc,
798 tx_cache: &mut HashMap<Txid, TxChainStatus>,
799 block_cache: &mut HashMap<BlockHash, (u32, u32)>,
800 txid: Txid,
801) -> Result<TxChainStatus, BridgeError> {
802 if let Some(status) = tx_cache.get(&txid) {
803 return Ok(*status);
804 }
805
806 let info = match rpc.get_raw_transaction_info(&txid, None).await {
807 Ok(info) => info,
808 Err(e) if is_not_found_error(&e) => {
809 tx_cache.insert(txid, TxChainStatus::NotPresent);
810 return Ok(TxChainStatus::NotPresent);
811 }
812 Err(e) => return Err(BridgeError::Eyre(eyre::eyre!(e))),
813 };
814
815 let status = match info.confirmations {
816 Some(c) if c > 0 => {
817 let blockhash = info.blockhash.ok_or_else(|| {
818 BridgeError::Eyre(eyre::eyre!(
819 "Confirmed transaction {txid} missing blockhash in RPC response"
820 ))
821 })?;
822
823 let (block_height, confirmations) =
824 if let Some((height, confs)) = block_cache.get(&blockhash) {
825 (*height, *confs)
826 } else {
827 let block_info = rpc
828 .get_block_info(&blockhash)
829 .await
830 .map_err(|e| BridgeError::Eyre(eyre::eyre!(e)))?;
831 let height_u32 = block_info.height as u32;
832 let confs_u32 = block_info.confirmations as u32;
833 block_cache.insert(blockhash, (height_u32, confs_u32));
834 (height_u32, confs_u32)
835 };
836
837 TxChainStatus::Confirmed {
838 block_height,
839 confirmations,
840 }
841 }
842 _ => match rpc.get_mempool_entry(&txid).await {
844 Ok(_) => TxChainStatus::InMempool,
845 Err(e) if is_mempool_not_found_error(&e) => TxChainStatus::NotPresent,
846 Err(e) => return Err(BridgeError::Eyre(eyre::eyre!(e))),
847 },
848 };
849 tx_cache.insert(txid, status);
850 Ok(status)
851}