clementine_tx_sender/citrea/
sync.rs

1use crate::citrea::data_serialization::DataOnDa;
2use crate::citrea::reveal_scripts::CitreaSigningData;
3use crate::citrea::TransactionKind;
4use crate::db::citrea::CitreaRawTxRow;
5use crate::rpc_errors::{is_mempool_not_found_error, is_not_found_error};
6use crate::TxSender;
7use bitcoin::hashes::Hash;
8use bitcoin::{Amount, TapSighashType};
9use bitcoincore_rpc::json::FundRawTransactionOptions;
10use bitcoincore_rpc::RpcApi;
11use clementine_primitives::FeeRateKvb;
12use clementine_utils::RbfSigningInfo;
13use eyre::{Context, OptionExt};
14use std::collections::{BTreeMap, BTreeSet, HashMap};
15
16type BlockTxPositionCache = HashMap<bitcoin::BlockHash, (u32, Vec<bitcoin::Txid>)>;
17
18/// Chain position and witness id for a confirmed transaction.
19///
20/// Aggregate bodies need both the legacy txid and wtxid of each confirmed
21/// chunk reveal. The block position is used to reject aggregate reveals that
22/// appear before a chunk reveal they reference.
23#[derive(Debug, Clone, Copy)]
24struct ConfirmedTxInfo {
25    txid: bitcoin::Txid,
26    wtxid: bitcoin::Wtxid,
27    block_height: u32,
28    tx_index: usize,
29}
30
31impl ConfirmedTxInfo {
32    /// Returns true when `self` is strictly after `other` in chain order.
33    fn is_after(&self, other: &Self) -> bool {
34        self.block_height > other.block_height
35            || (self.block_height == other.block_height && self.tx_index > other.tx_index)
36    }
37}
38
39/// Confirmed chunk reveal ids in Citrea aggregate-body order.
40struct ConfirmedChunkReveals {
41    reveal_txids: Vec<[u8; 32]>,
42    reveal_wtxids: Vec<[u8; 32]>,
43    confirmed_txs: Vec<ConfirmedTxInfo>,
44}
45
46/// Returns a Citrea row's optional try-to-send id as a checked unsigned id.
47fn optional_citrea_try_to_send_id(row: &CitreaRawTxRow) -> Result<Option<u32>, eyre::Report> {
48    row.try_to_send_id
49        .map(u32::try_from)
50        .transpose()
51        .wrap_err("Failed to convert citrea try_to_send_id to u32")
52}
53
54/// Returns a Citrea row's try-to-send id as a checked unsigned id.
55fn citrea_try_to_send_id(row: &CitreaRawTxRow) -> Result<u32, eyre::Report> {
56    optional_citrea_try_to_send_id(row)?.ok_or_eyre("Expected citrea try_to_send_id to be present")
57}
58
59impl TxSender {
60    /// Syncs citrea transactions, creating commit transactions for txs without it.
61    /// After creating commit tx, the reveal txs are added to the core txsender queue using insert_try_to_send as RBF txs.
62    pub async fn sync_citrea_txs(&self, fee_rate: FeeRateKvb) -> Result<(), eyre::Report> {
63        // First, check existing commit txids for eviction.
64        self.check_evicted_commit_txs().await?;
65
66        // First get all citrea rows (except aggregate tx) with commit_outpoint IS NULL.
67        // For all of these we will try to fund and create a tx that creates commit utxos.
68        let citrea_rows = self
69            .db
70            .get_citrea_txs_with_null_commit_outpoint(None)
71            .await?;
72
73        // Group rows by insertion_id since all chunk rows share the same eventual commit tx/outpoint.
74        let mut by_insertion_id: BTreeMap<i64, Vec<CitreaRawTxRow>> = BTreeMap::new();
75
76        for row in citrea_rows {
77            by_insertion_id
78                .entry(row.insertion_id)
79                .or_default()
80                .push(row);
81        }
82
83        if !by_insertion_id.is_empty() {
84            tracing::info!(
85                "Found {} pending non-aggregate citrea rows across {} insertion_id groups",
86                by_insertion_id.values().map(|v| v.len()).sum::<usize>(),
87                by_insertion_id.len()
88            );
89        }
90
91        // For each insertion_id group, create a single commit tx/outpoint shared by all rows.
92        for (insertion_id, rows) in by_insertion_id {
93            tracing::debug!(insertion_id, group_len = rows.len(), "Pending citrea group");
94
95            // Build reveal scripts and collect commit addresses for all rows in this group.
96            let mut rows_with_scripts = Vec::with_capacity(rows.len());
97
98            for row in rows {
99                let signing_data = self.create_reveal_script(row.transaction_kind, &row.body);
100                rows_with_scripts.push((row, signing_data));
101            }
102
103            let _ = self
104                .create_commit_outpoints_for_rows(fee_rate, insertion_id, rows_with_scripts)
105                .await?;
106        }
107
108        // Now we check for all rows (except aggregate again) that has commit_outpoint but try_to_send_id is NULL.
109        // For each of these, we will use the commit outpoint to create a tx that spends the commit utxo, revealing the data.
110        // All inserted txs will have RBF feepaying type.
111        let reveal_rows = self
112            .db
113            .get_citrea_txs_with_commit_outpoint_no_try_to_send(None)
114            .await?;
115
116        if !reveal_rows.is_empty() {
117            tracing::info!(
118                "Found {} citrea rows with commit_outpoint but no try_to_send_id",
119                reveal_rows.len()
120            );
121            for row in reveal_rows {
122                let commit_outpoint = row
123                    .commit_outpoint
124                    .ok_or_eyre("Expected commit_outpoint to be present")?;
125                let signing_data = self.create_reveal_script(row.transaction_kind, &row.body);
126
127                let try_to_send_id = self
128                    .insert_reveal_try_to_send(row.id, commit_outpoint, signing_data)
129                    .await?;
130
131                tracing::debug!(
132				"Created reveal tx for citrea row id={}, try_to_send_id={}, commit_outpoint={}:{}",
133				row.id,
134				try_to_send_id,
135				commit_outpoint.txid,
136				commit_outpoint.vout
137			);
138            }
139        }
140
141        // check for aggregate tx sending eligibility (if all chunks are confirmed), then send it
142        self.send_aggregate_txs(fee_rate).await?;
143
144        Ok(())
145    }
146
147    /// Checks existing commit txids for eviction based on citrea rows whose
148    /// try_to_send_id is NULL or not seen yet.
149    /// If evicted (not in mempool and never seen), clear commit_outpoint and delete
150    /// any reveal RBF entries tied to it.
151    /// These check is only required because commit_tx creation uses include_unsafe = true, if some unsafe input is
152    /// spent in another way, commit tx will become invalid.
153    async fn check_evicted_commit_txs(&self) -> Result<(), eyre::Report> {
154        let committed_rows = self
155            .db
156            .get_citrea_txs_with_commit_outpoint_unseen_try_to_send(None)
157            .await?;
158
159        let mut committed_by_commit_txid: BTreeMap<bitcoin::Txid, Vec<CitreaRawTxRow>> =
160            BTreeMap::new();
161        for row in committed_rows {
162            let commit_txid = row
163                .commit_outpoint
164                .ok_or_eyre("Expected commit_outpoint to be present")?
165                .txid;
166            committed_by_commit_txid
167                .entry(commit_txid)
168                .or_default()
169                .push(row);
170        }
171
172        for (commit_txid, rows) in committed_by_commit_txid {
173            let insertion_ids = rows
174                .iter()
175                .map(|row| row.insertion_id)
176                .collect::<BTreeSet<_>>();
177
178            let Some((in_mempool, seen_at_height)) =
179                self.db.get_activate_txid_status(None, commit_txid).await?
180            else {
181                continue;
182            };
183
184            if in_mempool || seen_at_height.is_some() {
185                continue;
186            }
187            // We don't want to mark commit tx as evicted if it is present in rpc (mempool or confirmed).
188            // To be sure if it shows as evicted in db check rpc for it too.
189
190            let rpc_present = match self.rpc.get_raw_transaction_info(&commit_txid, None).await {
191                Ok(info) => {
192                    if info.confirmations.unwrap_or(0) > 0 {
193                        true
194                    } else {
195                        match self.rpc.get_mempool_entry(&commit_txid).await {
196                            Ok(_) => true,
197                            Err(e) if is_mempool_not_found_error(&e) => false,
198                            Err(e) => {
199                                tracing::warn!(
200                                    ?insertion_ids,
201                                    %commit_txid,
202                                    error = %e,
203                                    "RPC mempool check failed; skipping eviction"
204                                );
205                                continue;
206                            }
207                        }
208                    }
209                }
210                Err(e) if is_not_found_error(&e) => false,
211                Err(e) => {
212                    tracing::warn!(
213                        ?insertion_ids,
214                        %commit_txid,
215                        error = %e,
216                        "RPC tx lookup failed; skipping eviction"
217                    );
218                    continue;
219                }
220            };
221
222            if rpc_present {
223                tracing::debug!(
224                    ?insertion_ids,
225                    %commit_txid,
226                    "Commit tx present according to RPC; skipping eviction"
227                );
228                continue;
229            }
230            tracing::warn!(
231                ?insertion_ids,
232                %commit_txid,
233                "Commit tx evicted; clearing commit_outpoint and deleting reveal RBF entries"
234            );
235
236            let mut dbtx = self.db.begin_transaction().await?;
237
238            let row_ids = rows.iter().map(|row| row.id).collect::<Vec<_>>();
239            let try_to_send_ids = rows
240                .iter()
241                .filter_map(|row| row.try_to_send_id)
242                .map(u32::try_from)
243                .collect::<Result<BTreeSet<_>, _>>()
244                .wrap_err("Failed to convert citrea try_to_send_id to u32")?;
245
246            self.db
247                .clear_citrea_commit_and_try_to_send_by_ids(Some(&mut dbtx), &row_ids)
248                .await?;
249
250            for try_to_send_id in try_to_send_ids {
251                self.db
252                    .delete_try_to_send_tx(Some(&mut dbtx), try_to_send_id)
253                    .await?;
254            }
255
256            self.db.commit_transaction(dbtx).await?;
257        }
258        Ok(())
259    }
260
261    /// Build and send aggregate reveal transactions once all chunk reveals are confirmed.
262    ///
263    /// Computes the aggregate body from confirmed chunk reveal txids/wtxids (ordered by chunk row id),
264    /// updates/reset aggregate rows on mismatch or invalid post-reorg ordering, then runs the same
265    /// commit/reveal flow. Marks aggregate rows as finalized only after the aggregate reveal and all
266    /// chunk reveals are finalized.
267    async fn send_aggregate_txs(&self, fee_rate: FeeRateKvb) -> Result<(), eyre::Report> {
268        let aggregate_rows = self.db.get_citrea_aggregate_rows_pending(None).await?;
269        if aggregate_rows.is_empty() {
270            return Ok(());
271        }
272
273        for aggregate_row in aggregate_rows {
274            let insertion_id = aggregate_row.insertion_id;
275            let chunk_rows = self
276                .db
277                .get_citrea_chunk_rows_by_insertion_id(None, insertion_id)
278                .await?;
279
280            if chunk_rows.is_empty() {
281                continue;
282            }
283
284            if chunk_rows.iter().any(|row| row.try_to_send_id.is_none()) {
285                continue;
286            }
287            let chunk_try_to_send_ids = chunk_rows
288                .iter()
289                .map(citrea_try_to_send_id)
290                .collect::<Result<Vec<_>, _>>()?;
291
292            let statuses = self
293                .db
294                .list_try_to_send_statuses_by_ids(None, &chunk_try_to_send_ids)
295                .await?;
296
297            let all_seen = chunk_try_to_send_ids
298                .iter()
299                .all(|id| statuses.get(id).and_then(|(seen, _)| *seen).is_some());
300
301            // Aggregate can only reference chunk reveals once every chunk is seen.
302            if !all_seen {
303                continue;
304            }
305
306            // Finalizing the aggregate row also requires all referenced chunks to be final.
307            let all_chunks_finalized = chunk_try_to_send_ids
308                .iter()
309                .all(|id| statuses.get(id).is_some_and(|(_, finalized)| *finalized));
310
311            let rbf_txids = self
312                .db
313                .list_rbf_txids_for_ids(None, &chunk_try_to_send_ids)
314                .await?;
315
316            let mut rbf_txids_by_id: HashMap<u32, Vec<bitcoin::Txid>> = HashMap::new();
317            for (id, txid) in rbf_txids {
318                rbf_txids_by_id.entry(id).or_default().push(txid);
319            }
320
321            let mut block_tx_position_cache = BlockTxPositionCache::new();
322            let Some(confirmed_chunk_reveals) = self
323                .collect_confirmed_chunk_reveals(
324                    &chunk_rows,
325                    &rbf_txids_by_id,
326                    &mut block_tx_position_cache,
327                )
328                .await?
329            else {
330                continue;
331            };
332            let ConfirmedChunkReveals {
333                reveal_txids,
334                reveal_wtxids,
335                confirmed_txs: confirmed_chunk_txs,
336            } = confirmed_chunk_reveals;
337
338            let aggregate = DataOnDa::Aggregate(reveal_txids, reveal_wtxids);
339            let aggregate_body: Vec<u8> =
340                borsh::to_vec(&aggregate).wrap_err("Failed to serialize aggregate body")?;
341
342            let body_matches = aggregate_row.body == aggregate_body;
343            let aggregate_try_to_send_id = optional_citrea_try_to_send_id(&aggregate_row)?;
344            let mut commit_outpoint = if body_matches {
345                aggregate_row.commit_outpoint
346            } else {
347                None
348            };
349            let mut try_to_send_id = if body_matches {
350                aggregate_try_to_send_id
351            } else {
352                None
353            };
354
355            // A changed aggregate body means chunk reveal txids/wtxids changed under us.
356            if !body_matches {
357                self.reset_citrea_aggregate_and_delete_try_to_send(
358                    aggregate_row.id,
359                    &aggregate_body,
360                    aggregate_try_to_send_id,
361                )
362                .await?;
363            } else if let Some(existing_try_to_send_id) = try_to_send_id {
364                let aggregate_rbf_txids = self
365                    .db
366                    .list_rbf_txids_for_id(None, existing_try_to_send_id)
367                    .await?;
368
369                let aggregate_confirmed_tx = self
370                    .select_confirmed_tx_info(&aggregate_rbf_txids, &mut block_tx_position_cache)
371                    .await?;
372
373                // Only validate ordering/finality once an aggregate reveal is on-chain.
374                if let Some(aggregate_confirmed_tx) = aggregate_confirmed_tx {
375                    let status = self
376                        .db
377                        .list_try_to_send_statuses_by_ids(None, &[existing_try_to_send_id])
378                        .await?;
379
380                    if let Some((aggregate_seen_at_height, is_aggregate_finalized)) =
381                        status.get(&existing_try_to_send_id)
382                    {
383                        // No seen height yet means confirmation sync has not caught this tx.
384                        if aggregate_seen_at_height.is_some() {
385                            let aggregate_after_chunks = confirmed_chunk_txs
386                                .iter()
387                                .all(|chunk_tx| aggregate_confirmed_tx.is_after(chunk_tx));
388
389                            // Aggregate reveal must be strictly after every chunk reveal it names.
390                            if !aggregate_after_chunks {
391                                tracing::warn!(
392                                    insertion_id,
393                                    aggregate_try_to_send_id = existing_try_to_send_id,
394                                    aggregate_txid = %aggregate_confirmed_tx.txid,
395                                    aggregate_height = aggregate_confirmed_tx.block_height,
396                                    aggregate_index = aggregate_confirmed_tx.tx_index,
397                                    "Aggregate reveal confirmed before at least one chunk reveal; resetting aggregate send state"
398                                );
399                                self.reset_citrea_aggregate_and_delete_try_to_send(
400                                    aggregate_row.id,
401                                    &aggregate_body,
402                                    Some(existing_try_to_send_id),
403                                )
404                                .await?;
405                                commit_outpoint = None;
406                                try_to_send_id = None;
407                            } else if all_chunks_finalized && *is_aggregate_finalized {
408                                // Safe to stop processing only when chunks and aggregate are final.
409                                self.db
410                                    .set_citrea_aggregate_finalized(None, aggregate_row.id)
411                                    .await?;
412                                continue;
413                            }
414                        }
415                    }
416                }
417            }
418
419            let signing_data =
420                self.create_reveal_script(TransactionKind::Aggregate, &aggregate_body);
421
422            // Missing/stale commit state is recreated after a body/order reset.
423            if commit_outpoint.is_none() {
424                let rows_with_scripts = vec![(aggregate_row.clone(), signing_data.clone())];
425                let Some(commit_txid) = self
426                    .create_commit_outpoints_for_rows(fee_rate, insertion_id, rows_with_scripts)
427                    .await?
428                else {
429                    continue;
430                };
431                commit_outpoint = Some(bitcoin::OutPoint {
432                    txid: commit_txid,
433                    vout: 0,
434                });
435            }
436
437            // Missing/stale reveal state is recreated after commit state is available.
438            if try_to_send_id.is_none() {
439                let commit_outpoint = commit_outpoint.expect("commit_outpoint must be set");
440                let _new_try_to_send_id = self
441                    .insert_reveal_try_to_send(aggregate_row.id, commit_outpoint, signing_data)
442                    .await?;
443            }
444        }
445
446        Ok(())
447    }
448
449    async fn create_commit_outpoints_for_rows(
450        &self,
451        fee_rate: FeeRateKvb,
452        insertion_id: i64,
453        rows_with_scripts: Vec<(CitreaRawTxRow, CitreaSigningData)>,
454    ) -> Result<Option<bitcoin::Txid>, eyre::Report> {
455        if rows_with_scripts.is_empty() {
456            return Ok(None);
457        }
458
459        let recipients: Vec<_> = rows_with_scripts
460            .iter()
461            .map(|(_row, signing_data)| signing_data.commit_address.clone())
462            .collect();
463
464        let unsigned_commit_tx = crate::citrea::build_commit_transaction(&recipients);
465        let raw_bytes = crate::serialize_tx_for_fund_raw(&unsigned_commit_tx);
466
467        let funded_hex = match self
468            .rpc
469            .fund_raw_transaction(
470                &raw_bytes,
471                Some(&FundRawTransactionOptions {
472                    add_inputs: Some(true),
473                    include_unsafe: Some(self.include_unsafe),
474                    change_address: None,
475                    change_position: Some(unsigned_commit_tx.output.len() as u32),
476                    change_type: None,
477                    include_watching: None,
478                    lock_unspents: None,
479                    fee_rate: Some(Amount::from_sat(fee_rate.to_sat_per_kvb())),
480                    subtract_fee_from_outputs: None,
481                    replaceable: Some(true),
482                    conf_target: None,
483                    estimate_mode: None,
484                }),
485                None,
486            )
487            .await
488        {
489            Ok(result) => result.hex,
490            Err(e) => {
491                tracing::error!(
492                    insertion_id,
493                    error = %e,
494                    "Failed to fund commit transaction, skipping group"
495                );
496                return Ok(None);
497            }
498        };
499
500        let signed_commit_tx = self
501            .rpc
502            .sign_raw_transaction_with_wallet(&funded_hex, None, None)
503            .await
504            .wrap_err("Failed to sign commit transaction")?
505            .transaction()
506            .wrap_err(
507                "Failed to convert result of sign_raw_transaction_with_wallet to btc transaction",
508            )?;
509
510        let commit_txid = signed_commit_tx.compute_txid();
511
512        if let Err(e) = self.rpc.send_raw_transaction(&signed_commit_tx).await {
513            tracing::warn!(
514                insertion_id,
515                commit_txid = %commit_txid,
516                error = %e,
517                "Failed to broadcast commit transaction, skipping group"
518            );
519            return Ok(None);
520        }
521
522        for (vout, (row, _signing_data)) in rows_with_scripts.into_iter().enumerate() {
523            let outpoint = bitcoin::OutPoint {
524                txid: commit_txid,
525                vout: vout as u32,
526            };
527
528            self.db
529                .set_citrea_commit_outpoint(None, row.id, outpoint)
530                .await?;
531        }
532
533        Ok(Some(commit_txid))
534    }
535
536    async fn insert_reveal_try_to_send(
537        &self,
538        row_id: i64,
539        commit_outpoint: bitcoin::OutPoint,
540        signing_data: CitreaSigningData,
541    ) -> Result<u32, eyre::Report> {
542        let reveal_tx =
543            crate::citrea::build_reveal_transaction(commit_outpoint.txid, commit_outpoint.vout);
544
545        let mut dbtx = self.db.begin_transaction().await?;
546        let try_to_send_id = self
547            .client
548            .insert_try_to_send(
549                &mut dbtx,
550                None,
551                &reveal_tx,
552                clementine_utils::FeePayingType::RbfWtxidGrind,
553                Some(RbfSigningInfo::new(
554                    0,
555                    clementine_utils::RbfSigningSpendPath::ScriptPath {
556                        control_block: signing_data.control_block.serialize(),
557                        script: signing_data.reveal_script.into_bytes(),
558                    },
559                    TapSighashType::Default,
560                )),
561                &[],
562                &[],
563                &[],
564                &[],
565            )
566            .await?;
567
568        self.db
569            .set_citrea_try_to_send_id(&mut dbtx, row_id, try_to_send_id as i32)
570            .await?;
571
572        self.db.commit_transaction(dbtx).await?;
573        Ok(try_to_send_id)
574    }
575
576    /// Resets an aggregate row to the supplied body and removes its stale reveal
577    /// tracking row in one DB transaction.
578    ///
579    /// `update_citrea_aggregate_body_and_reset` clears the aggregate row's
580    /// foreign-key reference before `delete_try_to_send_tx` removes the linked
581    /// tx-sender rows.
582    async fn reset_citrea_aggregate_and_delete_try_to_send(
583        &self,
584        aggregate_row_id: i64,
585        aggregate_body: &[u8],
586        try_to_send_id: Option<u32>,
587    ) -> Result<(), eyre::Report> {
588        let mut dbtx = self.db.begin_transaction().await?;
589
590        self.db
591            .update_citrea_aggregate_body_and_reset(
592                Some(&mut dbtx),
593                aggregate_row_id,
594                aggregate_body,
595            )
596            .await?;
597
598        if let Some(try_to_send_id) = try_to_send_id {
599            self.db
600                .delete_try_to_send_tx(Some(&mut dbtx), try_to_send_id)
601                .await?;
602        }
603
604        self.db.commit_transaction(dbtx).await?;
605        Ok(())
606    }
607
608    /// Returns confirmed chunk reveal information in chunk row order.
609    ///
610    /// `None` means the database has enough seen state to consider the chunks,
611    /// but the current Bitcoin RPC view does not have a confirmed RBF member for
612    /// at least one chunk. That can happen transiently around reorgs or before
613    /// confirmation sync catches up.
614    async fn collect_confirmed_chunk_reveals(
615        &self,
616        chunk_rows: &[CitreaRawTxRow],
617        rbf_txids_by_id: &HashMap<u32, Vec<bitcoin::Txid>>,
618        block_tx_position_cache: &mut BlockTxPositionCache,
619    ) -> Result<Option<ConfirmedChunkReveals>, eyre::Report> {
620        let mut reveal_txids = Vec::with_capacity(chunk_rows.len());
621        let mut reveal_wtxids = Vec::with_capacity(chunk_rows.len());
622        let mut confirmed_txs = Vec::with_capacity(chunk_rows.len());
623
624        for row in chunk_rows {
625            let try_to_send_id = citrea_try_to_send_id(row)?;
626            let txids = rbf_txids_by_id.get(&try_to_send_id).map(Vec::as_slice);
627            let Some(confirmed_tx) = self
628                .select_confirmed_tx_info(txids.unwrap_or_default(), block_tx_position_cache)
629                .await?
630            else {
631                return Ok(None);
632            };
633
634            reveal_txids.push(confirmed_tx.txid.to_byte_array());
635            reveal_wtxids.push(confirmed_tx.wtxid.to_byte_array());
636            confirmed_txs.push(confirmed_tx);
637        }
638
639        Ok(Some(ConfirmedChunkReveals {
640            reveal_txids,
641            reveal_wtxids,
642            confirmed_txs,
643        }))
644    }
645
646    /// Selects the newest confirmed member from an RBF txid history.
647    ///
648    /// The input is expected in newest-first insertion order, matching the
649    /// tx-sender RBF query helpers. Confirmed transactions include their wtxid
650    /// and block position so aggregate bodies can reference chunk reveals and
651    /// aggregate ordering can be validated after reorgs.
652    async fn select_confirmed_tx_info(
653        &self,
654        txids: &[bitcoin::Txid],
655        block_tx_position_cache: &mut BlockTxPositionCache,
656    ) -> Result<Option<ConfirmedTxInfo>, eyre::Report> {
657        for txid in txids {
658            let tx_info = match self.rpc.get_raw_transaction_info(txid, None).await {
659                Ok(info) => info,
660                Err(e) if is_not_found_error(&e) => continue,
661                Err(e) => return Err(eyre::eyre!(e)),
662            };
663
664            if tx_info
665                .confirmations
666                .is_none_or(|confirmations| confirmations == 0)
667            {
668                continue;
669            }
670
671            let blockhash = tx_info.blockhash.ok_or_eyre(format!(
672                "Confirmed transaction {txid} missing blockhash in RPC response"
673            ))?;
674
675            match block_tx_position_cache.get(&blockhash) {
676                Some(_) => {}
677                None => {
678                    let block_info = self
679                        .rpc
680                        .get_block_info(&blockhash)
681                        .await
682                        .wrap_err("Failed to fetch confirmed transaction block info")?;
683                    let block_height = u32::try_from(block_info.height)
684                        .wrap_err("Failed to convert confirmed transaction block height to u32")?;
685                    block_tx_position_cache.insert(blockhash, (block_height, block_info.tx));
686                }
687            }
688
689            let (block_height, block_txids) = block_tx_position_cache
690                .get(&blockhash)
691                .expect("block info was inserted above");
692            let tx_index = block_txids
693                .iter()
694                .position(|block_txid| block_txid == txid)
695                .ok_or_eyre(format!(
696                    "Confirmed transaction {txid} missing from block {blockhash}"
697                ))?;
698
699            return Ok(Some(ConfirmedTxInfo {
700                txid: *txid,
701                wtxid: tx_info.hash,
702                block_height: *block_height,
703                tx_index,
704            }));
705        }
706
707        Ok(None)
708    }
709}