1use super::wrapper::OutPointDB;
4use super::{TxSenderDb, TxSenderDbTx};
5use crate::txsender_execute_query_with_tx;
6use bitcoin::OutPoint;
7use clementine_errors::BridgeError;
8
9use crate::citrea::{calculate_sha256, TransactionKind};
10
11#[derive(Debug, Clone)]
13pub struct CitreaRawTxRow {
14 pub id: i64,
16 pub insertion_id: i64,
18 pub transaction_kind: TransactionKind,
20 pub body: Vec<u8>,
22 pub commit_outpoint: Option<OutPoint>,
24 pub try_to_send_id: Option<i32>,
26 pub aggregate_finalized: bool,
28}
29
30type CitreaRawTxRowDb = (
32 i64,
33 i64,
34 i16,
35 Option<Vec<u8>>,
36 Option<OutPointDB>,
37 Option<i32>,
38 bool,
39);
40
41impl From<CitreaRawTxRowDb> for CitreaRawTxRow {
42 fn from(row: CitreaRawTxRowDb) -> Self {
43 let (id, insertion_id, kind, body, commit_outpoint, try_to_send_id, aggregate_finalized) =
44 row;
45 Self {
46 id,
47 insertion_id,
48 transaction_kind: TransactionKind::from_u16(kind as u16),
49 body: body.unwrap_or_default(),
50 commit_outpoint: commit_outpoint.map(|op| op.0),
51 try_to_send_id,
52 aggregate_finalized,
53 }
54 }
55}
56
57impl TxSenderDb {
58 pub async fn insert_citrea_raw_tx_single(
62 &self,
63 tx: TxSenderDbTx<'_>,
64 transaction_kind: TransactionKind,
65 body: &[u8],
66 ) -> Result<i64, BridgeError> {
67 let body_hash = calculate_sha256(body);
68 let (insertion_id, _) = self
69 .insert_citrea_raw_tx_with_hash_status(tx, transaction_kind, Some(body), &body_hash)
70 .await?;
71 Ok(insertion_id)
72 }
73
74 pub(crate) async fn insert_citrea_raw_tx_single_with_hash(
80 &self,
81 tx: TxSenderDbTx<'_>,
82 transaction_kind: TransactionKind,
83 body: &[u8],
84 body_hash: &[u8],
85 ) -> Result<i64, BridgeError> {
86 let (insertion_id, _) = self
87 .insert_citrea_raw_tx_with_hash_status(tx, transaction_kind, Some(body), body_hash)
88 .await?;
89 Ok(insertion_id)
90 }
91
92 async fn insert_citrea_raw_tx_with_hash_status(
99 &self,
100 tx: TxSenderDbTx<'_>,
101 transaction_kind: TransactionKind,
102 body: Option<&[u8]>,
103 body_hash: &[u8],
104 ) -> Result<(i64, bool), BridgeError> {
105 let body = body.map(Vec::from);
106 let insert_query = sqlx::query_scalar::<_, i64>(
107 r#"
108 INSERT INTO tx_sender_citrea_raw_tx_queue (transaction_kind, body, body_hash)
109 VALUES ($1, $2, $3)
110 ON CONFLICT (body_hash) DO NOTHING
111 RETURNING insertion_id
112 "#,
113 )
114 .bind(transaction_kind.as_i16())
115 .bind(body)
116 .bind(body_hash);
117
118 if let Some(insertion_id) = insert_query.fetch_optional(&mut **tx).await? {
119 return Ok((insertion_id, true));
120 }
121
122 let insertion_id = sqlx::query_scalar::<_, i64>(
123 "SELECT insertion_id FROM tx_sender_citrea_raw_tx_queue WHERE body_hash = $1",
124 )
125 .bind(body_hash)
126 .fetch_one(&mut **tx)
127 .await?;
128
129 Ok((insertion_id, false))
130 }
131
132 pub async fn set_citrea_commit_outpoint(
134 &self,
135 tx: Option<TxSenderDbTx<'_>>,
136 id: i64,
137 outpoint: OutPoint,
138 ) -> Result<(), BridgeError> {
139 let query = sqlx::query(
140 "UPDATE tx_sender_citrea_raw_tx_queue SET commit_outpoint = $2 WHERE id = $1",
141 )
142 .bind(id)
143 .bind(OutPointDB(outpoint));
144
145 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
146 Ok(())
147 }
148
149 pub async fn insert_citrea_raw_tx_chunks(
157 &self,
158 tx: TxSenderDbTx<'_>,
159 chunks: &[Vec<u8>],
160 full_body_hash: &[u8],
161 ) -> Result<i64, BridgeError> {
162 if chunks.is_empty() {
163 return Err(eyre::eyre!("Chunks vector cannot be empty").into());
164 }
165
166 let (insertion_id, inserted) = self
168 .insert_citrea_raw_tx_with_hash_status(
169 tx,
170 TransactionKind::Aggregate,
171 None,
172 full_body_hash,
173 )
174 .await?;
175 if !inserted {
176 return Ok(insertion_id);
177 }
178
179 for chunk in chunks {
180 let query = sqlx::query(
181 r#"
182 INSERT INTO tx_sender_citrea_raw_tx_queue (insertion_id, transaction_kind, body, body_hash)
183 VALUES ($1, $2, $3, NULL)
184 "#,
185 )
186 .bind(insertion_id)
187 .bind(TransactionKind::Chunks.as_i16())
188 .bind(chunk.as_slice());
189
190 query.execute(&mut **tx).await?;
191 }
192
193 Ok(insertion_id)
194 }
195
196 pub async fn get_citrea_txs_with_null_commit_outpoint(
199 &self,
200 tx: Option<TxSenderDbTx<'_>>,
201 ) -> Result<Vec<CitreaRawTxRow>, BridgeError> {
202 let query = sqlx::query_as::<_, CitreaRawTxRowDb>(
203 "SELECT id, insertion_id, transaction_kind, body, commit_outpoint, try_to_send_id, aggregate_finalized
204 FROM tx_sender_citrea_raw_tx_queue
205 WHERE transaction_kind != $1
206 AND commit_outpoint IS NULL
207 AND body IS NOT NULL
208 ORDER BY created_at ASC",
209 )
210 .bind(TransactionKind::Aggregate.as_i16());
211
212 let results: Vec<CitreaRawTxRowDb> =
213 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
214
215 Ok(results.into_iter().map(CitreaRawTxRow::from).collect())
216 }
217
218 pub async fn get_citrea_txs_with_commit_outpoint_no_try_to_send(
221 &self,
222 tx: Option<TxSenderDbTx<'_>>,
223 ) -> Result<Vec<CitreaRawTxRow>, BridgeError> {
224 let query = sqlx::query_as::<_, CitreaRawTxRowDb>(
225 "SELECT id, insertion_id, transaction_kind, body, commit_outpoint, try_to_send_id, aggregate_finalized
226 FROM tx_sender_citrea_raw_tx_queue
227 WHERE transaction_kind != $1
228 AND commit_outpoint IS NOT NULL
229 AND try_to_send_id IS NULL
230 AND body IS NOT NULL
231 ORDER BY created_at ASC",
232 )
233 .bind(TransactionKind::Aggregate.as_i16());
234
235 let results: Vec<CitreaRawTxRowDb> =
236 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
237
238 Ok(results.into_iter().map(CitreaRawTxRow::from).collect())
239 }
240
241 pub async fn get_citrea_txs_with_commit_outpoint(
243 &self,
244 tx: Option<TxSenderDbTx<'_>>,
245 ) -> Result<Vec<CitreaRawTxRow>, BridgeError> {
246 let query = sqlx::query_as::<_, CitreaRawTxRowDb>(
247 "SELECT id, insertion_id, transaction_kind, body, commit_outpoint, try_to_send_id, aggregate_finalized
248 FROM tx_sender_citrea_raw_tx_queue
249 WHERE commit_outpoint IS NOT NULL",
250 );
251
252 let results: Vec<CitreaRawTxRowDb> =
253 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
254
255 Ok(results.into_iter().map(CitreaRawTxRow::from).collect())
256 }
257
258 pub async fn get_citrea_txs_with_commit_outpoint_unseen_try_to_send(
261 &self,
262 tx: Option<TxSenderDbTx<'_>>,
263 ) -> Result<Vec<CitreaRawTxRow>, BridgeError> {
264 let query = sqlx::query_as::<_, CitreaRawTxRowDb>(
265 "SELECT q.id, q.insertion_id, q.transaction_kind, q.body, q.commit_outpoint, q.try_to_send_id, q.aggregate_finalized
266 FROM tx_sender_citrea_raw_tx_queue q
267 LEFT JOIN tx_sender_try_to_send_txs t
268 ON t.id = q.try_to_send_id
269 WHERE q.commit_outpoint IS NOT NULL
270 AND (q.try_to_send_id IS NULL OR t.seen_at_height IS NULL)
271 ORDER BY q.id ASC",
272 );
273
274 let results: Vec<CitreaRawTxRowDb> =
275 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
276
277 Ok(results.into_iter().map(CitreaRawTxRow::from).collect())
278 }
279
280 pub async fn clear_citrea_commit_and_try_to_send_by_ids(
282 &self,
283 tx: Option<TxSenderDbTx<'_>>,
284 ids: &[i64],
285 ) -> Result<(), BridgeError> {
286 if ids.is_empty() {
287 return Ok(());
288 }
289
290 let query = sqlx::query(
291 "UPDATE tx_sender_citrea_raw_tx_queue
292 SET commit_outpoint = NULL, try_to_send_id = NULL
293 WHERE id = ANY($1)",
294 )
295 .bind(ids);
296
297 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
298 Ok(())
299 }
300
301 pub async fn set_citrea_try_to_send_id(
303 &self,
304 tx: TxSenderDbTx<'_>,
305 id: i64,
306 try_to_send_id: i32,
307 ) -> Result<(), BridgeError> {
308 let query = sqlx::query(
309 "UPDATE tx_sender_citrea_raw_tx_queue SET try_to_send_id = $2 WHERE id = $1",
310 )
311 .bind(id)
312 .bind(try_to_send_id);
313
314 query.execute(&mut **tx).await?;
315 Ok(())
316 }
317
318 pub async fn get_citrea_aggregate_rows_pending(
320 &self,
321 tx: Option<TxSenderDbTx<'_>>,
322 ) -> Result<Vec<CitreaRawTxRow>, BridgeError> {
323 let query = sqlx::query_as::<_, CitreaRawTxRowDb>(
324 "SELECT id, insertion_id, transaction_kind, body, commit_outpoint, try_to_send_id, aggregate_finalized
325 FROM tx_sender_citrea_raw_tx_queue
326 WHERE transaction_kind = $1
327 AND aggregate_finalized = FALSE
328 ORDER BY created_at ASC",
329 )
330 .bind(TransactionKind::Aggregate.as_i16());
331
332 let results: Vec<CitreaRawTxRowDb> =
333 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
334
335 Ok(results.into_iter().map(CitreaRawTxRow::from).collect())
336 }
337
338 pub async fn get_citrea_chunk_rows_by_insertion_id(
340 &self,
341 tx: Option<TxSenderDbTx<'_>>,
342 insertion_id: i64,
343 ) -> Result<Vec<CitreaRawTxRow>, BridgeError> {
344 let query = sqlx::query_as::<_, CitreaRawTxRowDb>(
345 "SELECT id, insertion_id, transaction_kind, body, commit_outpoint, try_to_send_id, aggregate_finalized
346 FROM tx_sender_citrea_raw_tx_queue
347 WHERE insertion_id = $1
348 AND transaction_kind = $2
349 ORDER BY id ASC",
350 )
351 .bind(insertion_id)
352 .bind(TransactionKind::Chunks.as_i16());
353
354 let results: Vec<CitreaRawTxRowDb> =
355 txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
356
357 Ok(results.into_iter().map(CitreaRawTxRow::from).collect())
358 }
359
360 pub async fn update_citrea_aggregate_body_and_reset(
366 &self,
367 tx: Option<TxSenderDbTx<'_>>,
368 id: i64,
369 body: &[u8],
370 ) -> Result<(), BridgeError> {
371 let query = sqlx::query(
372 "UPDATE tx_sender_citrea_raw_tx_queue
373 SET body = $2,
374 commit_outpoint = NULL,
375 try_to_send_id = NULL,
376 aggregate_finalized = FALSE
377 WHERE id = $1",
378 )
379 .bind(id)
380 .bind(body);
381
382 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
383 Ok(())
384 }
385
386 pub async fn set_citrea_aggregate_finalized(
388 &self,
389 tx: Option<TxSenderDbTx<'_>>,
390 id: i64,
391 ) -> Result<(), BridgeError> {
392 let query = sqlx::query(
393 "UPDATE tx_sender_citrea_raw_tx_queue
394 SET aggregate_finalized = TRUE
395 WHERE id = $1",
396 )
397 .bind(id);
398
399 txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
400 Ok(())
401 }
402}