1use crate::citrea::data_serialization::DataOnDa;
2use crate::citrea::reveal_scripts::CitreaSigningData;
3use crate::citrea::TransactionKind;
4use crate::db::citrea::CitreaRawTxRow;
5use crate::rpc_errors::{is_mempool_not_found_error, is_not_found_error};
6use crate::TxSender;
7use bitcoin::hashes::Hash;
8use bitcoin::{Amount, TapSighashType};
9use bitcoincore_rpc::json::FundRawTransactionOptions;
10use bitcoincore_rpc::RpcApi;
11use clementine_primitives::FeeRateKvb;
12use clementine_utils::RbfSigningInfo;
13use eyre::{Context, OptionExt};
14use std::collections::{BTreeMap, BTreeSet, HashMap};
15
16type BlockTxPositionCache = HashMap<bitcoin::BlockHash, (u32, Vec<bitcoin::Txid>)>;
17
18#[derive(Debug, Clone, Copy)]
24struct ConfirmedTxInfo {
25 txid: bitcoin::Txid,
26 wtxid: bitcoin::Wtxid,
27 block_height: u32,
28 tx_index: usize,
29}
30
31impl ConfirmedTxInfo {
32 fn is_after(&self, other: &Self) -> bool {
34 self.block_height > other.block_height
35 || (self.block_height == other.block_height && self.tx_index > other.tx_index)
36 }
37}
38
39struct ConfirmedChunkReveals {
41 reveal_txids: Vec<[u8; 32]>,
42 reveal_wtxids: Vec<[u8; 32]>,
43 confirmed_txs: Vec<ConfirmedTxInfo>,
44}
45
46fn optional_citrea_try_to_send_id(row: &CitreaRawTxRow) -> Result<Option<u32>, eyre::Report> {
48 row.try_to_send_id
49 .map(u32::try_from)
50 .transpose()
51 .wrap_err("Failed to convert citrea try_to_send_id to u32")
52}
53
54fn citrea_try_to_send_id(row: &CitreaRawTxRow) -> Result<u32, eyre::Report> {
56 optional_citrea_try_to_send_id(row)?.ok_or_eyre("Expected citrea try_to_send_id to be present")
57}
58
59impl TxSender {
60 pub async fn sync_citrea_txs(&self, fee_rate: FeeRateKvb) -> Result<(), eyre::Report> {
63 self.check_evicted_commit_txs().await?;
65
66 let citrea_rows = self
69 .db
70 .get_citrea_txs_with_null_commit_outpoint(None)
71 .await?;
72
73 let mut by_insertion_id: BTreeMap<i64, Vec<CitreaRawTxRow>> = BTreeMap::new();
75
76 for row in citrea_rows {
77 by_insertion_id
78 .entry(row.insertion_id)
79 .or_default()
80 .push(row);
81 }
82
83 if !by_insertion_id.is_empty() {
84 tracing::info!(
85 "Found {} pending non-aggregate citrea rows across {} insertion_id groups",
86 by_insertion_id.values().map(|v| v.len()).sum::<usize>(),
87 by_insertion_id.len()
88 );
89 }
90
91 for (insertion_id, rows) in by_insertion_id {
93 tracing::debug!(insertion_id, group_len = rows.len(), "Pending citrea group");
94
95 let mut rows_with_scripts = Vec::with_capacity(rows.len());
97
98 for row in rows {
99 let signing_data = self.create_reveal_script(row.transaction_kind, &row.body);
100 rows_with_scripts.push((row, signing_data));
101 }
102
103 let _ = self
104 .create_commit_outpoints_for_rows(fee_rate, insertion_id, rows_with_scripts)
105 .await?;
106 }
107
108 let reveal_rows = self
112 .db
113 .get_citrea_txs_with_commit_outpoint_no_try_to_send(None)
114 .await?;
115
116 if !reveal_rows.is_empty() {
117 tracing::info!(
118 "Found {} citrea rows with commit_outpoint but no try_to_send_id",
119 reveal_rows.len()
120 );
121 for row in reveal_rows {
122 let commit_outpoint = row
123 .commit_outpoint
124 .ok_or_eyre("Expected commit_outpoint to be present")?;
125 let signing_data = self.create_reveal_script(row.transaction_kind, &row.body);
126
127 let try_to_send_id = self
128 .insert_reveal_try_to_send(row.id, commit_outpoint, signing_data)
129 .await?;
130
131 tracing::debug!(
132 "Created reveal tx for citrea row id={}, try_to_send_id={}, commit_outpoint={}:{}",
133 row.id,
134 try_to_send_id,
135 commit_outpoint.txid,
136 commit_outpoint.vout
137 );
138 }
139 }
140
141 self.send_aggregate_txs(fee_rate).await?;
143
144 Ok(())
145 }
146
147 async fn check_evicted_commit_txs(&self) -> Result<(), eyre::Report> {
154 let committed_rows = self
155 .db
156 .get_citrea_txs_with_commit_outpoint_unseen_try_to_send(None)
157 .await?;
158
159 let mut committed_by_commit_txid: BTreeMap<bitcoin::Txid, Vec<CitreaRawTxRow>> =
160 BTreeMap::new();
161 for row in committed_rows {
162 let commit_txid = row
163 .commit_outpoint
164 .ok_or_eyre("Expected commit_outpoint to be present")?
165 .txid;
166 committed_by_commit_txid
167 .entry(commit_txid)
168 .or_default()
169 .push(row);
170 }
171
172 for (commit_txid, rows) in committed_by_commit_txid {
173 let insertion_ids = rows
174 .iter()
175 .map(|row| row.insertion_id)
176 .collect::<BTreeSet<_>>();
177
178 let Some((in_mempool, seen_at_height)) =
179 self.db.get_activate_txid_status(None, commit_txid).await?
180 else {
181 continue;
182 };
183
184 if in_mempool || seen_at_height.is_some() {
185 continue;
186 }
187 let rpc_present = match self.rpc.get_raw_transaction_info(&commit_txid, None).await {
191 Ok(info) => {
192 if info.confirmations.unwrap_or(0) > 0 {
193 true
194 } else {
195 match self.rpc.get_mempool_entry(&commit_txid).await {
196 Ok(_) => true,
197 Err(e) if is_mempool_not_found_error(&e) => false,
198 Err(e) => {
199 tracing::warn!(
200 ?insertion_ids,
201 %commit_txid,
202 error = %e,
203 "RPC mempool check failed; skipping eviction"
204 );
205 continue;
206 }
207 }
208 }
209 }
210 Err(e) if is_not_found_error(&e) => false,
211 Err(e) => {
212 tracing::warn!(
213 ?insertion_ids,
214 %commit_txid,
215 error = %e,
216 "RPC tx lookup failed; skipping eviction"
217 );
218 continue;
219 }
220 };
221
222 if rpc_present {
223 tracing::debug!(
224 ?insertion_ids,
225 %commit_txid,
226 "Commit tx present according to RPC; skipping eviction"
227 );
228 continue;
229 }
230 tracing::warn!(
231 ?insertion_ids,
232 %commit_txid,
233 "Commit tx evicted; clearing commit_outpoint and deleting reveal RBF entries"
234 );
235
236 let mut dbtx = self.db.begin_transaction().await?;
237
238 let row_ids = rows.iter().map(|row| row.id).collect::<Vec<_>>();
239 let try_to_send_ids = rows
240 .iter()
241 .filter_map(|row| row.try_to_send_id)
242 .map(u32::try_from)
243 .collect::<Result<BTreeSet<_>, _>>()
244 .wrap_err("Failed to convert citrea try_to_send_id to u32")?;
245
246 self.db
247 .clear_citrea_commit_and_try_to_send_by_ids(Some(&mut dbtx), &row_ids)
248 .await?;
249
250 for try_to_send_id in try_to_send_ids {
251 self.db
252 .delete_try_to_send_tx(Some(&mut dbtx), try_to_send_id)
253 .await?;
254 }
255
256 self.db.commit_transaction(dbtx).await?;
257 }
258 Ok(())
259 }
260
261 async fn send_aggregate_txs(&self, fee_rate: FeeRateKvb) -> Result<(), eyre::Report> {
268 let aggregate_rows = self.db.get_citrea_aggregate_rows_pending(None).await?;
269 if aggregate_rows.is_empty() {
270 return Ok(());
271 }
272
273 for aggregate_row in aggregate_rows {
274 let insertion_id = aggregate_row.insertion_id;
275 let chunk_rows = self
276 .db
277 .get_citrea_chunk_rows_by_insertion_id(None, insertion_id)
278 .await?;
279
280 if chunk_rows.is_empty() {
281 continue;
282 }
283
284 if chunk_rows.iter().any(|row| row.try_to_send_id.is_none()) {
285 continue;
286 }
287 let chunk_try_to_send_ids = chunk_rows
288 .iter()
289 .map(citrea_try_to_send_id)
290 .collect::<Result<Vec<_>, _>>()?;
291
292 let statuses = self
293 .db
294 .list_try_to_send_statuses_by_ids(None, &chunk_try_to_send_ids)
295 .await?;
296
297 let all_seen = chunk_try_to_send_ids
298 .iter()
299 .all(|id| statuses.get(id).and_then(|(seen, _)| *seen).is_some());
300
301 if !all_seen {
303 continue;
304 }
305
306 let all_chunks_finalized = chunk_try_to_send_ids
308 .iter()
309 .all(|id| statuses.get(id).is_some_and(|(_, finalized)| *finalized));
310
311 let rbf_txids = self
312 .db
313 .list_rbf_txids_for_ids(None, &chunk_try_to_send_ids)
314 .await?;
315
316 let mut rbf_txids_by_id: HashMap<u32, Vec<bitcoin::Txid>> = HashMap::new();
317 for (id, txid) in rbf_txids {
318 rbf_txids_by_id.entry(id).or_default().push(txid);
319 }
320
321 let mut block_tx_position_cache = BlockTxPositionCache::new();
322 let Some(confirmed_chunk_reveals) = self
323 .collect_confirmed_chunk_reveals(
324 &chunk_rows,
325 &rbf_txids_by_id,
326 &mut block_tx_position_cache,
327 )
328 .await?
329 else {
330 continue;
331 };
332 let ConfirmedChunkReveals {
333 reveal_txids,
334 reveal_wtxids,
335 confirmed_txs: confirmed_chunk_txs,
336 } = confirmed_chunk_reveals;
337
338 let aggregate = DataOnDa::Aggregate(reveal_txids, reveal_wtxids);
339 let aggregate_body: Vec<u8> =
340 borsh::to_vec(&aggregate).wrap_err("Failed to serialize aggregate body")?;
341
342 let body_matches = aggregate_row.body == aggregate_body;
343 let aggregate_try_to_send_id = optional_citrea_try_to_send_id(&aggregate_row)?;
344 let mut commit_outpoint = if body_matches {
345 aggregate_row.commit_outpoint
346 } else {
347 None
348 };
349 let mut try_to_send_id = if body_matches {
350 aggregate_try_to_send_id
351 } else {
352 None
353 };
354
355 if !body_matches {
357 self.reset_citrea_aggregate_and_delete_try_to_send(
358 aggregate_row.id,
359 &aggregate_body,
360 aggregate_try_to_send_id,
361 )
362 .await?;
363 } else if let Some(existing_try_to_send_id) = try_to_send_id {
364 let aggregate_rbf_txids = self
365 .db
366 .list_rbf_txids_for_id(None, existing_try_to_send_id)
367 .await?;
368
369 let aggregate_confirmed_tx = self
370 .select_confirmed_tx_info(&aggregate_rbf_txids, &mut block_tx_position_cache)
371 .await?;
372
373 if let Some(aggregate_confirmed_tx) = aggregate_confirmed_tx {
375 let status = self
376 .db
377 .list_try_to_send_statuses_by_ids(None, &[existing_try_to_send_id])
378 .await?;
379
380 if let Some((aggregate_seen_at_height, is_aggregate_finalized)) =
381 status.get(&existing_try_to_send_id)
382 {
383 if aggregate_seen_at_height.is_some() {
385 let aggregate_after_chunks = confirmed_chunk_txs
386 .iter()
387 .all(|chunk_tx| aggregate_confirmed_tx.is_after(chunk_tx));
388
389 if !aggregate_after_chunks {
391 tracing::warn!(
392 insertion_id,
393 aggregate_try_to_send_id = existing_try_to_send_id,
394 aggregate_txid = %aggregate_confirmed_tx.txid,
395 aggregate_height = aggregate_confirmed_tx.block_height,
396 aggregate_index = aggregate_confirmed_tx.tx_index,
397 "Aggregate reveal confirmed before at least one chunk reveal; resetting aggregate send state"
398 );
399 self.reset_citrea_aggregate_and_delete_try_to_send(
400 aggregate_row.id,
401 &aggregate_body,
402 Some(existing_try_to_send_id),
403 )
404 .await?;
405 commit_outpoint = None;
406 try_to_send_id = None;
407 } else if all_chunks_finalized && *is_aggregate_finalized {
408 self.db
410 .set_citrea_aggregate_finalized(None, aggregate_row.id)
411 .await?;
412 continue;
413 }
414 }
415 }
416 }
417 }
418
419 let signing_data =
420 self.create_reveal_script(TransactionKind::Aggregate, &aggregate_body);
421
422 if commit_outpoint.is_none() {
424 let rows_with_scripts = vec![(aggregate_row.clone(), signing_data.clone())];
425 let Some(commit_txid) = self
426 .create_commit_outpoints_for_rows(fee_rate, insertion_id, rows_with_scripts)
427 .await?
428 else {
429 continue;
430 };
431 commit_outpoint = Some(bitcoin::OutPoint {
432 txid: commit_txid,
433 vout: 0,
434 });
435 }
436
437 if try_to_send_id.is_none() {
439 let commit_outpoint = commit_outpoint.expect("commit_outpoint must be set");
440 let _new_try_to_send_id = self
441 .insert_reveal_try_to_send(aggregate_row.id, commit_outpoint, signing_data)
442 .await?;
443 }
444 }
445
446 Ok(())
447 }
448
449 async fn create_commit_outpoints_for_rows(
450 &self,
451 fee_rate: FeeRateKvb,
452 insertion_id: i64,
453 rows_with_scripts: Vec<(CitreaRawTxRow, CitreaSigningData)>,
454 ) -> Result<Option<bitcoin::Txid>, eyre::Report> {
455 if rows_with_scripts.is_empty() {
456 return Ok(None);
457 }
458
459 let recipients: Vec<_> = rows_with_scripts
460 .iter()
461 .map(|(_row, signing_data)| signing_data.commit_address.clone())
462 .collect();
463
464 let unsigned_commit_tx = crate::citrea::build_commit_transaction(&recipients);
465 let raw_bytes = crate::serialize_tx_for_fund_raw(&unsigned_commit_tx);
466
467 let funded_hex = match self
468 .rpc
469 .fund_raw_transaction(
470 &raw_bytes,
471 Some(&FundRawTransactionOptions {
472 add_inputs: Some(true),
473 include_unsafe: Some(self.include_unsafe),
474 change_address: None,
475 change_position: Some(unsigned_commit_tx.output.len() as u32),
476 change_type: None,
477 include_watching: None,
478 lock_unspents: None,
479 fee_rate: Some(Amount::from_sat(fee_rate.to_sat_per_kvb())),
480 subtract_fee_from_outputs: None,
481 replaceable: Some(true),
482 conf_target: None,
483 estimate_mode: None,
484 }),
485 None,
486 )
487 .await
488 {
489 Ok(result) => result.hex,
490 Err(e) => {
491 tracing::error!(
492 insertion_id,
493 error = %e,
494 "Failed to fund commit transaction, skipping group"
495 );
496 return Ok(None);
497 }
498 };
499
500 let signed_commit_tx = self
501 .rpc
502 .sign_raw_transaction_with_wallet(&funded_hex, None, None)
503 .await
504 .wrap_err("Failed to sign commit transaction")?
505 .transaction()
506 .wrap_err(
507 "Failed to convert result of sign_raw_transaction_with_wallet to btc transaction",
508 )?;
509
510 let commit_txid = signed_commit_tx.compute_txid();
511
512 if let Err(e) = self.rpc.send_raw_transaction(&signed_commit_tx).await {
513 tracing::warn!(
514 insertion_id,
515 commit_txid = %commit_txid,
516 error = %e,
517 "Failed to broadcast commit transaction, skipping group"
518 );
519 return Ok(None);
520 }
521
522 for (vout, (row, _signing_data)) in rows_with_scripts.into_iter().enumerate() {
523 let outpoint = bitcoin::OutPoint {
524 txid: commit_txid,
525 vout: vout as u32,
526 };
527
528 self.db
529 .set_citrea_commit_outpoint(None, row.id, outpoint)
530 .await?;
531 }
532
533 Ok(Some(commit_txid))
534 }
535
536 async fn insert_reveal_try_to_send(
537 &self,
538 row_id: i64,
539 commit_outpoint: bitcoin::OutPoint,
540 signing_data: CitreaSigningData,
541 ) -> Result<u32, eyre::Report> {
542 let reveal_tx =
543 crate::citrea::build_reveal_transaction(commit_outpoint.txid, commit_outpoint.vout);
544
545 let mut dbtx = self.db.begin_transaction().await?;
546 let try_to_send_id = self
547 .client
548 .insert_try_to_send(
549 &mut dbtx,
550 None,
551 &reveal_tx,
552 clementine_utils::FeePayingType::RbfWtxidGrind,
553 Some(RbfSigningInfo::new(
554 0,
555 clementine_utils::RbfSigningSpendPath::ScriptPath {
556 control_block: signing_data.control_block.serialize(),
557 script: signing_data.reveal_script.into_bytes(),
558 },
559 TapSighashType::Default,
560 )),
561 &[],
562 &[],
563 &[],
564 &[],
565 )
566 .await?;
567
568 self.db
569 .set_citrea_try_to_send_id(&mut dbtx, row_id, try_to_send_id as i32)
570 .await?;
571
572 self.db.commit_transaction(dbtx).await?;
573 Ok(try_to_send_id)
574 }
575
576 async fn reset_citrea_aggregate_and_delete_try_to_send(
583 &self,
584 aggregate_row_id: i64,
585 aggregate_body: &[u8],
586 try_to_send_id: Option<u32>,
587 ) -> Result<(), eyre::Report> {
588 let mut dbtx = self.db.begin_transaction().await?;
589
590 self.db
591 .update_citrea_aggregate_body_and_reset(
592 Some(&mut dbtx),
593 aggregate_row_id,
594 aggregate_body,
595 )
596 .await?;
597
598 if let Some(try_to_send_id) = try_to_send_id {
599 self.db
600 .delete_try_to_send_tx(Some(&mut dbtx), try_to_send_id)
601 .await?;
602 }
603
604 self.db.commit_transaction(dbtx).await?;
605 Ok(())
606 }
607
608 async fn collect_confirmed_chunk_reveals(
615 &self,
616 chunk_rows: &[CitreaRawTxRow],
617 rbf_txids_by_id: &HashMap<u32, Vec<bitcoin::Txid>>,
618 block_tx_position_cache: &mut BlockTxPositionCache,
619 ) -> Result<Option<ConfirmedChunkReveals>, eyre::Report> {
620 let mut reveal_txids = Vec::with_capacity(chunk_rows.len());
621 let mut reveal_wtxids = Vec::with_capacity(chunk_rows.len());
622 let mut confirmed_txs = Vec::with_capacity(chunk_rows.len());
623
624 for row in chunk_rows {
625 let try_to_send_id = citrea_try_to_send_id(row)?;
626 let txids = rbf_txids_by_id.get(&try_to_send_id).map(Vec::as_slice);
627 let Some(confirmed_tx) = self
628 .select_confirmed_tx_info(txids.unwrap_or_default(), block_tx_position_cache)
629 .await?
630 else {
631 return Ok(None);
632 };
633
634 reveal_txids.push(confirmed_tx.txid.to_byte_array());
635 reveal_wtxids.push(confirmed_tx.wtxid.to_byte_array());
636 confirmed_txs.push(confirmed_tx);
637 }
638
639 Ok(Some(ConfirmedChunkReveals {
640 reveal_txids,
641 reveal_wtxids,
642 confirmed_txs,
643 }))
644 }
645
646 async fn select_confirmed_tx_info(
653 &self,
654 txids: &[bitcoin::Txid],
655 block_tx_position_cache: &mut BlockTxPositionCache,
656 ) -> Result<Option<ConfirmedTxInfo>, eyre::Report> {
657 for txid in txids {
658 let tx_info = match self.rpc.get_raw_transaction_info(txid, None).await {
659 Ok(info) => info,
660 Err(e) if is_not_found_error(&e) => continue,
661 Err(e) => return Err(eyre::eyre!(e)),
662 };
663
664 if tx_info
665 .confirmations
666 .is_none_or(|confirmations| confirmations == 0)
667 {
668 continue;
669 }
670
671 let blockhash = tx_info.blockhash.ok_or_eyre(format!(
672 "Confirmed transaction {txid} missing blockhash in RPC response"
673 ))?;
674
675 match block_tx_position_cache.get(&blockhash) {
676 Some(_) => {}
677 None => {
678 let block_info = self
679 .rpc
680 .get_block_info(&blockhash)
681 .await
682 .wrap_err("Failed to fetch confirmed transaction block info")?;
683 let block_height = u32::try_from(block_info.height)
684 .wrap_err("Failed to convert confirmed transaction block height to u32")?;
685 block_tx_position_cache.insert(blockhash, (block_height, block_info.tx));
686 }
687 }
688
689 let (block_height, block_txids) = block_tx_position_cache
690 .get(&blockhash)
691 .expect("block info was inserted above");
692 let tx_index = block_txids
693 .iter()
694 .position(|block_txid| block_txid == txid)
695 .ok_or_eyre(format!(
696 "Confirmed transaction {txid} missing from block {blockhash}"
697 ))?;
698
699 return Ok(Some(ConfirmedTxInfo {
700 txid: *txid,
701 wtxid: tx_info.hash,
702 block_height: *block_height,
703 tx_index,
704 }));
705 }
706
707 Ok(None)
708 }
709}