clementine_tx_sender/
client.rs

1//! # Transaction Sender Client
2//!
3//! This module is provides a client which is responsible for inserting
4//! transactions into the sending queue.
5
6use crate::{ActivatedWithOutpoint, ActivatedWithTxid};
7use bitcoin::{OutPoint, Transaction, Txid};
8use clementine_errors::BridgeError;
9use clementine_utils::{FeePayingType, RbfSigningInfo, TxMetadata};
10use eyre::eyre;
11use std::collections::BTreeMap;
12
13#[cfg(feature = "citrea")]
14use crate::citrea::CitreaTxRequest;
15#[cfg(feature = "citrea")]
16use crate::citrea::TransactionKind;
17
18#[derive(Debug, Clone)]
19pub struct TxSenderClient {
20    pub db: crate::TxSenderDb,
21}
22
23impl TxSenderClient {
24    pub fn new(db: crate::TxSenderDb) -> Self {
25        Self { db }
26    }
27
28    /// Saves a transaction to the database queue for sending/fee bumping.
29    ///
30    /// This function determines the initial parameters for a transaction send attempt,
31    /// including its [`FeePayingType`], associated metadata, and dependencies (cancellations/activations).
32    /// It then persists this information in the database via [`Database::save_tx`] and related functions.
33    /// The actual sending logic (CPFP/RBF) is handled later by the transaction sender's task loop.
34    ///
35    /// # Default Activation and Cancellation Conditions
36    ///
37    /// By default, this function automatically adds cancellation conditions for all outpoints
38    /// spent by the `signed_tx` itself. If `signed_tx` confirms, these input outpoints
39    /// are marked as spent/cancelled in the database.
40    ///
41    /// There are no default activation conditions added implicitly; all activation prerequisites
42    /// must be explicitly provided via the `activate_txids` and `activate_outpoints` arguments.
43    ///
44    /// # Arguments
45    /// * `dbtx` - An active database transaction.
46    /// * `tx_metadata` - Optional metadata about the transaction's purpose.
47    /// * `signed_tx` - The transaction to be potentially sent.
48    /// * `fee_paying_type` - Whether to use CPFP or RBF for fee management.
49    /// * `cancel_outpoints` - Outpoints that should be marked invalid if this tx confirms (in addition to the tx's own inputs).
50    /// * `cancel_txids` - Txids that should be marked invalid if this tx confirms.
51    /// * `activate_txids` - Txids that are prerequisites for this tx, potentially with a relative timelock.
52    /// * `activate_outpoints` - Outpoints that are prerequisites for this tx, potentially with a relative timelock.
53    ///
54    /// # Returns
55    ///
56    /// - [`u32`]: The database ID (`try_to_send_id`) assigned to this send attempt.
57    #[tracing::instrument(err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE), skip_all, fields(?tx_metadata))]
58    #[allow(clippy::too_many_arguments)]
59    pub async fn insert_try_to_send(
60        &self,
61        dbtx: &mut crate::TxSenderTransaction,
62        tx_metadata: Option<TxMetadata>,
63        signed_tx: &Transaction,
64        fee_paying_type: FeePayingType,
65        rbf_signing_info: Option<RbfSigningInfo>,
66        cancel_outpoints: &[OutPoint],
67        cancel_txids: &[Txid],
68        activate_txids: &[ActivatedWithTxid],
69        activate_outpoints: &[ActivatedWithOutpoint],
70    ) -> Result<u32, BridgeError> {
71        let txid = signed_tx.compute_txid();
72
73        // do not add duplicate transactions to the txsender
74        let tx_exists = self
75            .db
76            .check_if_tx_exists_on_txsender(Some(dbtx), txid)
77            .await?;
78        if let Some(try_to_send_id) = tx_exists {
79            return Ok(try_to_send_id);
80        }
81
82        tracing::info!(
83            "Added tx {} with txid {} to the queue",
84            tx_metadata
85                .as_ref()
86                .map(|data| format!("{:?}", data.tx_type))
87                .unwrap_or("N/A".to_string()),
88            txid
89        );
90
91        let try_to_send_id = self
92            .db
93            .save_tx(
94                dbtx,
95                tx_metadata,
96                signed_tx,
97                fee_paying_type,
98                txid,
99                rbf_signing_info,
100            )
101            .await?;
102
103        // only log the raw tx in tests so that logs do not contain sensitive information
104        #[cfg(test)]
105        tracing::debug!(target: "ci", "Saved tx to database with try_to_send_id: {try_to_send_id}, metadata: {tx_metadata:?}, raw tx: {}", hex::encode(bitcoin::consensus::serialize(signed_tx)));
106
107        for input_outpoint in signed_tx.input.iter().map(|input| input.previous_output) {
108            self.db
109                .save_cancelled_outpoint(dbtx, try_to_send_id, input_outpoint)
110                .await?;
111        }
112
113        for outpoint in cancel_outpoints {
114            self.db
115                .save_cancelled_outpoint(dbtx, try_to_send_id, *outpoint)
116                .await?;
117        }
118
119        for txid in cancel_txids {
120            self.db
121                .save_cancelled_txid(dbtx, try_to_send_id, *txid)
122                .await?;
123        }
124
125        let mut max_timelock_of_activated_txids = BTreeMap::new();
126
127        for activated_txid in activate_txids {
128            let timelock = max_timelock_of_activated_txids
129                .entry(activated_txid.txid)
130                .or_insert(activated_txid.relative_block_height);
131            if *timelock < activated_txid.relative_block_height {
132                *timelock = activated_txid.relative_block_height;
133            }
134        }
135
136        for input in signed_tx.input.iter() {
137            let relative_block_height = if input.sequence.is_relative_lock_time() {
138                let relative_locktime = input
139                    .sequence
140                    .to_relative_lock_time()
141                    .expect("Invalid relative locktime");
142                match relative_locktime {
143                    bitcoin::relative::LockTime::Blocks(height) => height.value() as u32,
144                    _ => {
145                        return Err(BridgeError::Eyre(eyre!("Invalid relative locktime")));
146                    }
147                }
148            } else {
149                0
150            };
151            let timelock = max_timelock_of_activated_txids
152                .entry(input.previous_output.txid)
153                .or_insert(relative_block_height);
154            if *timelock < relative_block_height {
155                *timelock = relative_block_height;
156            }
157        }
158
159        for (txid, timelock) in max_timelock_of_activated_txids {
160            self.db
161                .save_activated_txid(
162                    dbtx,
163                    try_to_send_id,
164                    &ActivatedWithTxid {
165                        txid,
166                        relative_block_height: timelock,
167                    },
168                )
169                .await?;
170        }
171
172        for activated_outpoint in activate_outpoints {
173            self.db
174                .save_activated_outpoint(dbtx, try_to_send_id, activated_outpoint)
175                .await?;
176        }
177
178        Ok(try_to_send_id)
179    }
180
181    #[cfg(feature = "citrea")]
182    pub async fn send_citrea_tx(&self, request: CitreaTxRequest) -> Result<i64, eyre::Report> {
183        use crate::citrea::data_serialization::DataOnDa;
184        use crate::citrea::MAX_CHUNK_SIZE;
185
186        let mut dbtx = self.db.begin_transaction().await?;
187
188        let insertion_id = match request {
189            CitreaTxRequest::BatchProof { bytes, chunk_size } => {
190                // Hash the original proof bytes so the same proof dedupes even if callers
191                // retry it with a different chunk_size or as a non-chunked Complete body.
192                let full_body_hash = crate::citrea::calculate_sha256(&bytes);
193                let mut chunk_size = chunk_size.unwrap_or(MAX_CHUNK_SIZE);
194                if chunk_size == 0 {
195                    chunk_size = MAX_CHUNK_SIZE;
196                }
197                if chunk_size > MAX_CHUNK_SIZE {
198                    chunk_size = MAX_CHUNK_SIZE;
199                }
200                let chunk_size = chunk_size as usize;
201
202                if bytes.len() <= chunk_size {
203                    let data = DataOnDa::Complete(bytes);
204                    let blob = borsh::to_vec(&data).expect("zk::Proof serialize must not fail");
205                    self.db
206                        .insert_citrea_raw_tx_single_with_hash(
207                            &mut dbtx,
208                            TransactionKind::Complete,
209                            &blob,
210                            &full_body_hash,
211                        )
212                        .await?
213                } else {
214                    let chunks: Vec<Vec<u8>> = bytes
215                        .chunks(chunk_size)
216                        .map(|chunk| {
217                            borsh::to_vec(&DataOnDa::Chunk(chunk.to_vec()))
218                                .expect("zk::Proof serialize must not fail")
219                        })
220                        .collect();
221                    self.db
222                        .insert_citrea_raw_tx_chunks(&mut dbtx, &chunks, &full_body_hash)
223                        .await?
224                }
225            }
226            CitreaTxRequest::BatchProofMethodId(body) => {
227                if body.len() as u32 > MAX_CHUNK_SIZE {
228                    return Err(eyre!(
229                        "Citrea BatchProofMethodId DA payload body too large; max {} bytes",
230                        MAX_CHUNK_SIZE,
231                    ));
232                }
233                self.db
234                    .insert_citrea_raw_tx_single(
235                        &mut dbtx,
236                        TransactionKind::BatchProofMethodId,
237                        &body,
238                    )
239                    .await?
240            }
241            CitreaTxRequest::SequencerCommitment(body) => {
242                if body.len() as u32 > MAX_CHUNK_SIZE {
243                    return Err(eyre!(
244                        "Citrea SequencerCommitment DA payload body too large; max {} bytes",
245                        MAX_CHUNK_SIZE,
246                    ));
247                }
248                self.db
249                    .insert_citrea_raw_tx_single(
250                        &mut dbtx,
251                        TransactionKind::SequencerCommitment,
252                        &body,
253                    )
254                    .await?
255            }
256        };
257
258        self.db.commit_transaction(dbtx).await?;
259        Ok(insertion_id)
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use sqlx::Row;
267
268    #[cfg(feature = "citrea")]
269    #[tokio::test]
270    async fn test_send_citrea_tx_batch_proof() {
271        use crate::citrea::data_serialization::DataOnDa;
272        use crate::citrea::CitreaTxRequest;
273        use crate::test_utils::create_test_environment;
274
275        let db = create_test_environment(true, false).await.1.unwrap();
276        let client = TxSenderClient::new(db.clone());
277
278        let body = vec![1, 2, 3, 4, 5];
279        let expected_body_hash = crate::citrea::calculate_sha256(&body).to_vec();
280        let insertion_id = client
281            .send_citrea_tx(CitreaTxRequest::BatchProof {
282                bytes: body.clone(),
283                chunk_size: None,
284            })
285            .await
286            .expect("Should insert successfully");
287
288        let serialized_body =
289            borsh::to_vec(&DataOnDa::Complete(body)).expect("Serialization should not fail");
290
291        // Verify row was inserted
292        let row = sqlx::query(
293            "SELECT insertion_id, transaction_kind, body, body_hash FROM tx_sender_citrea_raw_tx_queue WHERE body = $1",
294        )
295        .bind(&serialized_body)
296        .fetch_one(db.pool())
297        .await
298        .expect("Should find inserted row");
299
300        assert_eq!(row.get::<i16, _>("transaction_kind"), 0); // Complete
301        assert_eq!(row.get::<Vec<u8>, _>("body"), serialized_body);
302        assert_eq!(row.get::<Vec<u8>, _>("body_hash"), expected_body_hash);
303        assert_eq!(row.get::<i64, _>("insertion_id"), insertion_id);
304    }
305
306    #[cfg(feature = "citrea")]
307    #[tokio::test]
308    async fn test_send_citrea_tx_chunks() {
309        use crate::citrea::data_serialization::DataOnDa;
310        use crate::citrea::CitreaTxRequest;
311        use crate::test_utils::create_test_environment;
312
313        let db = create_test_environment(true, false).await.1.unwrap();
314        let client = TxSenderClient::new(db.clone());
315
316        let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
317        let expected_body_hash = crate::citrea::calculate_sha256(&bytes).to_vec();
318        let chunks: Vec<Vec<u8>> = bytes.chunks(3).map(|chunk| chunk.to_vec()).collect();
319        let insertion_id = client
320            .send_citrea_tx(CitreaTxRequest::BatchProof {
321                bytes,
322                chunk_size: Some(3),
323            })
324            .await
325            .expect("Should insert successfully");
326
327        // Verify all chunk rows + aggregate row were inserted with same insertion_id
328        let rows = sqlx::query(
329            "SELECT id, insertion_id, transaction_kind, body, body_hash FROM tx_sender_citrea_raw_tx_queue ORDER BY id ASC",
330        )
331        .fetch_all(db.pool())
332        .await
333        .expect("Should find inserted rows");
334
335        assert_eq!(rows.len(), 4); // 3 chunks + 1 aggregate
336
337        let aggregate_rows: Vec<_> = rows
338            .iter()
339            .filter(|row| row.get::<i16, _>("transaction_kind") == 1)
340            .collect();
341        assert_eq!(aggregate_rows.len(), 1);
342        assert_eq!(
343            aggregate_rows[0].get::<i64, _>("insertion_id"),
344            insertion_id
345        );
346        assert_eq!(aggregate_rows[0].get::<Option<Vec<u8>>, _>("body"), None);
347        assert_eq!(
348            aggregate_rows[0].get::<Option<Vec<u8>>, _>("body_hash"),
349            Some(expected_body_hash.clone())
350        );
351
352        let chunk_rows: Vec<_> = rows
353            .iter()
354            .filter(|row| row.get::<i16, _>("transaction_kind") == 2)
355            .collect();
356        assert_eq!(chunk_rows.len(), chunks.len());
357        for (idx, row) in chunk_rows.iter().enumerate() {
358            assert_eq!(row.get::<i64, _>("insertion_id"), insertion_id);
359            assert_eq!(
360                row.get::<Option<Vec<u8>>, _>("body"),
361                Some(
362                    borsh::to_vec(&DataOnDa::Chunk(chunks[idx].clone()))
363                        .expect("Serialization should not fail")
364                )
365            );
366            assert_eq!(row.get::<Option<Vec<u8>>, _>("body_hash"), None);
367        }
368
369        let aggregate_id = aggregate_rows[0].get::<i64, _>("id");
370        db.update_citrea_aggregate_body_and_reset(None, aggregate_id, b"aggregate-body")
371            .await
372            .expect("Aggregate body update should succeed");
373
374        let body_hash_after_update: Option<Vec<u8>> =
375            sqlx::query_scalar("SELECT body_hash FROM tx_sender_citrea_raw_tx_queue WHERE id = $1")
376                .bind(aggregate_id)
377                .fetch_one(db.pool())
378                .await
379                .expect("Should fetch aggregate body hash");
380
381        assert_eq!(body_hash_after_update, Some(expected_body_hash));
382    }
383
384    #[cfg(feature = "citrea")]
385    #[tokio::test]
386    async fn test_send_citrea_tx_batch_proof_dedupes_by_full_body() {
387        use crate::citrea::CitreaTxRequest;
388        use crate::test_utils::create_test_environment;
389
390        let db = create_test_environment(true, false).await.1.unwrap();
391        let client = TxSenderClient::new(db.clone());
392
393        let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
394        let first_insertion_id = client
395            .send_citrea_tx(CitreaTxRequest::BatchProof {
396                bytes: bytes.clone(),
397                chunk_size: Some(3),
398            })
399            .await
400            .expect("First insert should succeed");
401
402        let second_insertion_id = client
403            .send_citrea_tx(CitreaTxRequest::BatchProof {
404                bytes: bytes.clone(),
405                chunk_size: Some(4),
406            })
407            .await
408            .expect("Second insert should return existing insertion_id");
409
410        let complete_insertion_id = client
411            .send_citrea_tx(CitreaTxRequest::BatchProof {
412                bytes,
413                chunk_size: None,
414            })
415            .await
416            .expect("Complete insert should return existing insertion_id");
417
418        assert_eq!(first_insertion_id, second_insertion_id);
419        assert_eq!(first_insertion_id, complete_insertion_id);
420
421        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tx_sender_citrea_raw_tx_queue")
422            .fetch_one(db.pool())
423            .await
424            .expect("Should count rows");
425
426        assert_eq!(count, 4, "Should keep one aggregate row and three chunks");
427    }
428
429    #[cfg(feature = "citrea")]
430    #[tokio::test]
431    async fn test_send_citrea_tx_duplicate_body() {
432        use crate::citrea::CitreaTxRequest;
433        use crate::test_utils::create_test_environment;
434
435        let db = create_test_environment(true, false).await.1.unwrap();
436        let client = TxSenderClient::new(db.clone());
437
438        let body = vec![10, 20, 30];
439        let first_insertion_id = client
440            .send_citrea_tx(CitreaTxRequest::BatchProofMethodId(body.clone()))
441            .await
442            .expect("First insert should succeed");
443
444        // Try to insert duplicate body - should return existing insertion_id
445        let second_insertion_id = client
446            .send_citrea_tx(CitreaTxRequest::BatchProofMethodId(body))
447            .await
448            .expect("Second insert should return existing insertion_id");
449
450        assert_eq!(first_insertion_id, second_insertion_id);
451
452        // Verify only one row exists
453        let count: i64 = sqlx::query_scalar(
454            "SELECT COUNT(*) FROM tx_sender_citrea_raw_tx_queue WHERE body = $1",
455        )
456        .bind(vec![10u8, 20, 30])
457        .fetch_one(db.pool())
458        .await
459        .expect("Should count rows");
460
461        assert_eq!(count, 1, "Should have exactly one row with this body");
462    }
463
464    #[cfg(feature = "citrea")]
465    #[tokio::test]
466    #[ignore = "Think about duplicate body possibility first"]
467    async fn test_send_citrea_tx_transaction_rollback() {
468        use crate::citrea::CitreaTxRequest;
469        use crate::test_utils::create_test_environment;
470
471        let db = create_test_environment(true, false).await.1.unwrap();
472        let client = TxSenderClient::new(db.clone());
473
474        let body1 = vec![100, 200];
475        // Insert first body
476        client
477            .send_citrea_tx(CitreaTxRequest::SequencerCommitment(body1))
478            .await
479            .expect("First insert should succeed");
480
481        // Try to insert chunks where one chunk body duplicates body1
482        // This should cause transaction rollback, so no rows should be inserted
483        let bytes = vec![1, 2, 100, 200, 4, 5, 6];
484        let result = client
485            .send_citrea_tx(CitreaTxRequest::BatchProof {
486                bytes,
487                chunk_size: Some(2),
488            })
489            .await;
490
491        assert!(result.is_err(), "Should fail due to duplicate body");
492
493        // Verify no partial insert happened - count should be 1 (only the first insert)
494        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tx_sender_citrea_raw_tx_queue")
495            .fetch_one(db.pool())
496            .await
497            .expect("Should count rows");
498
499        assert_eq!(
500            count, 1,
501            "Should have only the first row, no partial chunk inserts"
502        );
503    }
504}