1use crate::{ActivatedWithOutpoint, ActivatedWithTxid};
7use bitcoin::{OutPoint, Transaction, Txid};
8use clementine_errors::BridgeError;
9use clementine_utils::{FeePayingType, RbfSigningInfo, TxMetadata};
10use eyre::eyre;
11use std::collections::BTreeMap;
12
13#[cfg(feature = "citrea")]
14use crate::citrea::CitreaTxRequest;
15#[cfg(feature = "citrea")]
16use crate::citrea::TransactionKind;
17
18#[derive(Debug, Clone)]
19pub struct TxSenderClient {
20 pub db: crate::TxSenderDb,
21}
22
23impl TxSenderClient {
24 pub fn new(db: crate::TxSenderDb) -> Self {
25 Self { db }
26 }
27
28 #[tracing::instrument(err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE), skip_all, fields(?tx_metadata))]
58 #[allow(clippy::too_many_arguments)]
59 pub async fn insert_try_to_send(
60 &self,
61 dbtx: &mut crate::TxSenderTransaction,
62 tx_metadata: Option<TxMetadata>,
63 signed_tx: &Transaction,
64 fee_paying_type: FeePayingType,
65 rbf_signing_info: Option<RbfSigningInfo>,
66 cancel_outpoints: &[OutPoint],
67 cancel_txids: &[Txid],
68 activate_txids: &[ActivatedWithTxid],
69 activate_outpoints: &[ActivatedWithOutpoint],
70 ) -> Result<u32, BridgeError> {
71 let txid = signed_tx.compute_txid();
72
73 let tx_exists = self
75 .db
76 .check_if_tx_exists_on_txsender(Some(dbtx), txid)
77 .await?;
78 if let Some(try_to_send_id) = tx_exists {
79 return Ok(try_to_send_id);
80 }
81
82 tracing::info!(
83 "Added tx {} with txid {} to the queue",
84 tx_metadata
85 .as_ref()
86 .map(|data| format!("{:?}", data.tx_type))
87 .unwrap_or("N/A".to_string()),
88 txid
89 );
90
91 let try_to_send_id = self
92 .db
93 .save_tx(
94 dbtx,
95 tx_metadata,
96 signed_tx,
97 fee_paying_type,
98 txid,
99 rbf_signing_info,
100 )
101 .await?;
102
103 #[cfg(test)]
105 tracing::debug!(target: "ci", "Saved tx to database with try_to_send_id: {try_to_send_id}, metadata: {tx_metadata:?}, raw tx: {}", hex::encode(bitcoin::consensus::serialize(signed_tx)));
106
107 for input_outpoint in signed_tx.input.iter().map(|input| input.previous_output) {
108 self.db
109 .save_cancelled_outpoint(dbtx, try_to_send_id, input_outpoint)
110 .await?;
111 }
112
113 for outpoint in cancel_outpoints {
114 self.db
115 .save_cancelled_outpoint(dbtx, try_to_send_id, *outpoint)
116 .await?;
117 }
118
119 for txid in cancel_txids {
120 self.db
121 .save_cancelled_txid(dbtx, try_to_send_id, *txid)
122 .await?;
123 }
124
125 let mut max_timelock_of_activated_txids = BTreeMap::new();
126
127 for activated_txid in activate_txids {
128 let timelock = max_timelock_of_activated_txids
129 .entry(activated_txid.txid)
130 .or_insert(activated_txid.relative_block_height);
131 if *timelock < activated_txid.relative_block_height {
132 *timelock = activated_txid.relative_block_height;
133 }
134 }
135
136 for input in signed_tx.input.iter() {
137 let relative_block_height = if input.sequence.is_relative_lock_time() {
138 let relative_locktime = input
139 .sequence
140 .to_relative_lock_time()
141 .expect("Invalid relative locktime");
142 match relative_locktime {
143 bitcoin::relative::LockTime::Blocks(height) => height.value() as u32,
144 _ => {
145 return Err(BridgeError::Eyre(eyre!("Invalid relative locktime")));
146 }
147 }
148 } else {
149 0
150 };
151 let timelock = max_timelock_of_activated_txids
152 .entry(input.previous_output.txid)
153 .or_insert(relative_block_height);
154 if *timelock < relative_block_height {
155 *timelock = relative_block_height;
156 }
157 }
158
159 for (txid, timelock) in max_timelock_of_activated_txids {
160 self.db
161 .save_activated_txid(
162 dbtx,
163 try_to_send_id,
164 &ActivatedWithTxid {
165 txid,
166 relative_block_height: timelock,
167 },
168 )
169 .await?;
170 }
171
172 for activated_outpoint in activate_outpoints {
173 self.db
174 .save_activated_outpoint(dbtx, try_to_send_id, activated_outpoint)
175 .await?;
176 }
177
178 Ok(try_to_send_id)
179 }
180
181 #[cfg(feature = "citrea")]
182 pub async fn send_citrea_tx(&self, request: CitreaTxRequest) -> Result<i64, eyre::Report> {
183 use crate::citrea::data_serialization::DataOnDa;
184 use crate::citrea::MAX_CHUNK_SIZE;
185
186 let mut dbtx = self.db.begin_transaction().await?;
187
188 let insertion_id = match request {
189 CitreaTxRequest::BatchProof { bytes, chunk_size } => {
190 let full_body_hash = crate::citrea::calculate_sha256(&bytes);
193 let mut chunk_size = chunk_size.unwrap_or(MAX_CHUNK_SIZE);
194 if chunk_size == 0 {
195 chunk_size = MAX_CHUNK_SIZE;
196 }
197 if chunk_size > MAX_CHUNK_SIZE {
198 chunk_size = MAX_CHUNK_SIZE;
199 }
200 let chunk_size = chunk_size as usize;
201
202 if bytes.len() <= chunk_size {
203 let data = DataOnDa::Complete(bytes);
204 let blob = borsh::to_vec(&data).expect("zk::Proof serialize must not fail");
205 self.db
206 .insert_citrea_raw_tx_single_with_hash(
207 &mut dbtx,
208 TransactionKind::Complete,
209 &blob,
210 &full_body_hash,
211 )
212 .await?
213 } else {
214 let chunks: Vec<Vec<u8>> = bytes
215 .chunks(chunk_size)
216 .map(|chunk| {
217 borsh::to_vec(&DataOnDa::Chunk(chunk.to_vec()))
218 .expect("zk::Proof serialize must not fail")
219 })
220 .collect();
221 self.db
222 .insert_citrea_raw_tx_chunks(&mut dbtx, &chunks, &full_body_hash)
223 .await?
224 }
225 }
226 CitreaTxRequest::BatchProofMethodId(body) => {
227 if body.len() as u32 > MAX_CHUNK_SIZE {
228 return Err(eyre!(
229 "Citrea BatchProofMethodId DA payload body too large; max {} bytes",
230 MAX_CHUNK_SIZE,
231 ));
232 }
233 self.db
234 .insert_citrea_raw_tx_single(
235 &mut dbtx,
236 TransactionKind::BatchProofMethodId,
237 &body,
238 )
239 .await?
240 }
241 CitreaTxRequest::SequencerCommitment(body) => {
242 if body.len() as u32 > MAX_CHUNK_SIZE {
243 return Err(eyre!(
244 "Citrea SequencerCommitment DA payload body too large; max {} bytes",
245 MAX_CHUNK_SIZE,
246 ));
247 }
248 self.db
249 .insert_citrea_raw_tx_single(
250 &mut dbtx,
251 TransactionKind::SequencerCommitment,
252 &body,
253 )
254 .await?
255 }
256 };
257
258 self.db.commit_transaction(dbtx).await?;
259 Ok(insertion_id)
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use sqlx::Row;
267
268 #[cfg(feature = "citrea")]
269 #[tokio::test]
270 async fn test_send_citrea_tx_batch_proof() {
271 use crate::citrea::data_serialization::DataOnDa;
272 use crate::citrea::CitreaTxRequest;
273 use crate::test_utils::create_test_environment;
274
275 let db = create_test_environment(true, false).await.1.unwrap();
276 let client = TxSenderClient::new(db.clone());
277
278 let body = vec![1, 2, 3, 4, 5];
279 let expected_body_hash = crate::citrea::calculate_sha256(&body).to_vec();
280 let insertion_id = client
281 .send_citrea_tx(CitreaTxRequest::BatchProof {
282 bytes: body.clone(),
283 chunk_size: None,
284 })
285 .await
286 .expect("Should insert successfully");
287
288 let serialized_body =
289 borsh::to_vec(&DataOnDa::Complete(body)).expect("Serialization should not fail");
290
291 let row = sqlx::query(
293 "SELECT insertion_id, transaction_kind, body, body_hash FROM tx_sender_citrea_raw_tx_queue WHERE body = $1",
294 )
295 .bind(&serialized_body)
296 .fetch_one(db.pool())
297 .await
298 .expect("Should find inserted row");
299
300 assert_eq!(row.get::<i16, _>("transaction_kind"), 0); assert_eq!(row.get::<Vec<u8>, _>("body"), serialized_body);
302 assert_eq!(row.get::<Vec<u8>, _>("body_hash"), expected_body_hash);
303 assert_eq!(row.get::<i64, _>("insertion_id"), insertion_id);
304 }
305
306 #[cfg(feature = "citrea")]
307 #[tokio::test]
308 async fn test_send_citrea_tx_chunks() {
309 use crate::citrea::data_serialization::DataOnDa;
310 use crate::citrea::CitreaTxRequest;
311 use crate::test_utils::create_test_environment;
312
313 let db = create_test_environment(true, false).await.1.unwrap();
314 let client = TxSenderClient::new(db.clone());
315
316 let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
317 let expected_body_hash = crate::citrea::calculate_sha256(&bytes).to_vec();
318 let chunks: Vec<Vec<u8>> = bytes.chunks(3).map(|chunk| chunk.to_vec()).collect();
319 let insertion_id = client
320 .send_citrea_tx(CitreaTxRequest::BatchProof {
321 bytes,
322 chunk_size: Some(3),
323 })
324 .await
325 .expect("Should insert successfully");
326
327 let rows = sqlx::query(
329 "SELECT id, insertion_id, transaction_kind, body, body_hash FROM tx_sender_citrea_raw_tx_queue ORDER BY id ASC",
330 )
331 .fetch_all(db.pool())
332 .await
333 .expect("Should find inserted rows");
334
335 assert_eq!(rows.len(), 4); let aggregate_rows: Vec<_> = rows
338 .iter()
339 .filter(|row| row.get::<i16, _>("transaction_kind") == 1)
340 .collect();
341 assert_eq!(aggregate_rows.len(), 1);
342 assert_eq!(
343 aggregate_rows[0].get::<i64, _>("insertion_id"),
344 insertion_id
345 );
346 assert_eq!(aggregate_rows[0].get::<Option<Vec<u8>>, _>("body"), None);
347 assert_eq!(
348 aggregate_rows[0].get::<Option<Vec<u8>>, _>("body_hash"),
349 Some(expected_body_hash.clone())
350 );
351
352 let chunk_rows: Vec<_> = rows
353 .iter()
354 .filter(|row| row.get::<i16, _>("transaction_kind") == 2)
355 .collect();
356 assert_eq!(chunk_rows.len(), chunks.len());
357 for (idx, row) in chunk_rows.iter().enumerate() {
358 assert_eq!(row.get::<i64, _>("insertion_id"), insertion_id);
359 assert_eq!(
360 row.get::<Option<Vec<u8>>, _>("body"),
361 Some(
362 borsh::to_vec(&DataOnDa::Chunk(chunks[idx].clone()))
363 .expect("Serialization should not fail")
364 )
365 );
366 assert_eq!(row.get::<Option<Vec<u8>>, _>("body_hash"), None);
367 }
368
369 let aggregate_id = aggregate_rows[0].get::<i64, _>("id");
370 db.update_citrea_aggregate_body_and_reset(None, aggregate_id, b"aggregate-body")
371 .await
372 .expect("Aggregate body update should succeed");
373
374 let body_hash_after_update: Option<Vec<u8>> =
375 sqlx::query_scalar("SELECT body_hash FROM tx_sender_citrea_raw_tx_queue WHERE id = $1")
376 .bind(aggregate_id)
377 .fetch_one(db.pool())
378 .await
379 .expect("Should fetch aggregate body hash");
380
381 assert_eq!(body_hash_after_update, Some(expected_body_hash));
382 }
383
384 #[cfg(feature = "citrea")]
385 #[tokio::test]
386 async fn test_send_citrea_tx_batch_proof_dedupes_by_full_body() {
387 use crate::citrea::CitreaTxRequest;
388 use crate::test_utils::create_test_environment;
389
390 let db = create_test_environment(true, false).await.1.unwrap();
391 let client = TxSenderClient::new(db.clone());
392
393 let bytes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9];
394 let first_insertion_id = client
395 .send_citrea_tx(CitreaTxRequest::BatchProof {
396 bytes: bytes.clone(),
397 chunk_size: Some(3),
398 })
399 .await
400 .expect("First insert should succeed");
401
402 let second_insertion_id = client
403 .send_citrea_tx(CitreaTxRequest::BatchProof {
404 bytes: bytes.clone(),
405 chunk_size: Some(4),
406 })
407 .await
408 .expect("Second insert should return existing insertion_id");
409
410 let complete_insertion_id = client
411 .send_citrea_tx(CitreaTxRequest::BatchProof {
412 bytes,
413 chunk_size: None,
414 })
415 .await
416 .expect("Complete insert should return existing insertion_id");
417
418 assert_eq!(first_insertion_id, second_insertion_id);
419 assert_eq!(first_insertion_id, complete_insertion_id);
420
421 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tx_sender_citrea_raw_tx_queue")
422 .fetch_one(db.pool())
423 .await
424 .expect("Should count rows");
425
426 assert_eq!(count, 4, "Should keep one aggregate row and three chunks");
427 }
428
429 #[cfg(feature = "citrea")]
430 #[tokio::test]
431 async fn test_send_citrea_tx_duplicate_body() {
432 use crate::citrea::CitreaTxRequest;
433 use crate::test_utils::create_test_environment;
434
435 let db = create_test_environment(true, false).await.1.unwrap();
436 let client = TxSenderClient::new(db.clone());
437
438 let body = vec![10, 20, 30];
439 let first_insertion_id = client
440 .send_citrea_tx(CitreaTxRequest::BatchProofMethodId(body.clone()))
441 .await
442 .expect("First insert should succeed");
443
444 let second_insertion_id = client
446 .send_citrea_tx(CitreaTxRequest::BatchProofMethodId(body))
447 .await
448 .expect("Second insert should return existing insertion_id");
449
450 assert_eq!(first_insertion_id, second_insertion_id);
451
452 let count: i64 = sqlx::query_scalar(
454 "SELECT COUNT(*) FROM tx_sender_citrea_raw_tx_queue WHERE body = $1",
455 )
456 .bind(vec![10u8, 20, 30])
457 .fetch_one(db.pool())
458 .await
459 .expect("Should count rows");
460
461 assert_eq!(count, 1, "Should have exactly one row with this body");
462 }
463
464 #[cfg(feature = "citrea")]
465 #[tokio::test]
466 #[ignore = "Think about duplicate body possibility first"]
467 async fn test_send_citrea_tx_transaction_rollback() {
468 use crate::citrea::CitreaTxRequest;
469 use crate::test_utils::create_test_environment;
470
471 let db = create_test_environment(true, false).await.1.unwrap();
472 let client = TxSenderClient::new(db.clone());
473
474 let body1 = vec![100, 200];
475 client
477 .send_citrea_tx(CitreaTxRequest::SequencerCommitment(body1))
478 .await
479 .expect("First insert should succeed");
480
481 let bytes = vec![1, 2, 100, 200, 4, 5, 6];
484 let result = client
485 .send_citrea_tx(CitreaTxRequest::BatchProof {
486 bytes,
487 chunk_size: Some(2),
488 })
489 .await;
490
491 assert!(result.is_err(), "Should fail due to duplicate body");
492
493 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM tx_sender_citrea_raw_tx_queue")
495 .fetch_one(db.pool())
496 .await
497 .expect("Should count rows");
498
499 assert_eq!(
500 count, 1,
501 "Should have only the first row, no partial chunk inserts"
502 );
503 }
504}