clementine_tx_sender/db/
citrea.rs

1//! Citrea-specific SQLx queries for tx-sender tables.
2
3use super::wrapper::OutPointDB;
4use super::{TxSenderDb, TxSenderDbTx};
5use crate::txsender_execute_query_with_tx;
6use bitcoin::OutPoint;
7use clementine_errors::BridgeError;
8
9use crate::citrea::{calculate_sha256, TransactionKind};
10
11/// Represents a single Citrea raw transaction queue row.
12#[derive(Debug, Clone)]
13pub struct CitreaRawTxRow {
14    /// Database row ID.
15    pub id: i64,
16    /// Group identifier shared across all rows belonging to the same CitreaTxRequest.
17    pub insertion_id: i64,
18    /// Transaction kind as defined in `citrea::TransactionKind`.
19    pub transaction_kind: TransactionKind,
20    /// Raw body bytes.
21    pub body: Vec<u8>,
22    /// Optional commit outpoint once known.
23    pub commit_outpoint: Option<OutPoint>,
24    /// Optional link to a tx_sender_try_to_send_txs row once it exists.
25    pub try_to_send_id: Option<i32>,
26    /// Whether this aggregate row is finalized and should not be reprocessed.
27    pub aggregate_finalized: bool,
28}
29
30/// Raw row shape returned by SQLx for Citrea queue rows.
31type CitreaRawTxRowDb = (
32    i64,
33    i64,
34    i16,
35    Option<Vec<u8>>,
36    Option<OutPointDB>,
37    Option<i32>,
38    bool,
39);
40
41impl From<CitreaRawTxRowDb> for CitreaRawTxRow {
42    fn from(row: CitreaRawTxRowDb) -> Self {
43        let (id, insertion_id, kind, body, commit_outpoint, try_to_send_id, aggregate_finalized) =
44            row;
45        Self {
46            id,
47            insertion_id,
48            transaction_kind: TransactionKind::from_u16(kind as u16),
49            body: body.unwrap_or_default(),
50            commit_outpoint: commit_outpoint.map(|op| op.0),
51            try_to_send_id,
52            aggregate_finalized,
53        }
54    }
55}
56
57impl TxSenderDb {
58    /// Inserts a single non-chunked Citrea raw tx row.
59    /// Uses the row body itself as the `body_hash` dedupe input.
60    /// Returns the insertion_id assigned to this row.
61    pub async fn insert_citrea_raw_tx_single(
62        &self,
63        tx: TxSenderDbTx<'_>,
64        transaction_kind: TransactionKind,
65        body: &[u8],
66    ) -> Result<i64, BridgeError> {
67        let body_hash = calculate_sha256(body);
68        let (insertion_id, _) = self
69            .insert_citrea_raw_tx_with_hash_status(tx, transaction_kind, Some(body), &body_hash)
70            .await?;
71        Ok(insertion_id)
72    }
73
74    /// Inserts a single Citrea raw tx row using an explicit `body_hash`.
75    ///
76    /// This is used for `BatchProof` so Complete and chunked encodings of the
77    /// same proof share one dedupe key, even though the stored row body differs.
78    /// Returns the insertion_id assigned to this row.
79    pub(crate) async fn insert_citrea_raw_tx_single_with_hash(
80        &self,
81        tx: TxSenderDbTx<'_>,
82        transaction_kind: TransactionKind,
83        body: &[u8],
84        body_hash: &[u8],
85    ) -> Result<i64, BridgeError> {
86        let (insertion_id, _) = self
87            .insert_citrea_raw_tx_with_hash_status(tx, transaction_kind, Some(body), body_hash)
88            .await?;
89        Ok(insertion_id)
90    }
91
92    /// Inserts a Citrea raw tx row and returns whether it was newly inserted.
93    ///
94    /// `body_hash` is the unique dedupe key. It usually hashes this row's `body`,
95    /// but callers may pass the hash of a larger logical payload when several rows
96    /// represent one request.
97    /// If the body hash already exists, returns the existing insertion_id with `inserted = false`.
98    async fn insert_citrea_raw_tx_with_hash_status(
99        &self,
100        tx: TxSenderDbTx<'_>,
101        transaction_kind: TransactionKind,
102        body: Option<&[u8]>,
103        body_hash: &[u8],
104    ) -> Result<(i64, bool), BridgeError> {
105        let body = body.map(Vec::from);
106        let insert_query = sqlx::query_scalar::<_, i64>(
107            r#"
108            INSERT INTO tx_sender_citrea_raw_tx_queue (transaction_kind, body, body_hash)
109            VALUES ($1, $2, $3)
110            ON CONFLICT (body_hash) DO NOTHING
111            RETURNING insertion_id
112            "#,
113        )
114        .bind(transaction_kind.as_i16())
115        .bind(body)
116        .bind(body_hash);
117
118        if let Some(insertion_id) = insert_query.fetch_optional(&mut **tx).await? {
119            return Ok((insertion_id, true));
120        }
121
122        let insertion_id = sqlx::query_scalar::<_, i64>(
123            "SELECT insertion_id FROM tx_sender_citrea_raw_tx_queue WHERE body_hash = $1",
124        )
125        .bind(body_hash)
126        .fetch_one(&mut **tx)
127        .await?;
128
129        Ok((insertion_id, false))
130    }
131
132    /// Sets the commit outpoint for a specific Citrea raw tx row.
133    pub async fn set_citrea_commit_outpoint(
134        &self,
135        tx: Option<TxSenderDbTx<'_>>,
136        id: i64,
137        outpoint: OutPoint,
138    ) -> Result<(), BridgeError> {
139        let query = sqlx::query(
140            "UPDATE tx_sender_citrea_raw_tx_queue SET commit_outpoint = $2 WHERE id = $1",
141        )
142        .bind(id)
143        .bind(OutPointDB(outpoint));
144
145        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
146        Ok(())
147    }
148
149    /// Inserts chunked Citrea raw tx data: N chunk rows + 1 aggregate row.
150    /// All rows share the same insertion_id.
151    ///
152    /// The aggregate placeholder carries `full_body_hash` as the dedupe key for
153    /// the original proof. Chunk rows intentionally keep `body_hash` NULL; we do
154    /// not dedupe individual chunks against unrelated requests.
155    /// Returns the insertion_id assigned to this group.
156    pub async fn insert_citrea_raw_tx_chunks(
157        &self,
158        tx: TxSenderDbTx<'_>,
159        chunks: &[Vec<u8>],
160        full_body_hash: &[u8],
161    ) -> Result<i64, BridgeError> {
162        if chunks.is_empty() {
163            return Err(eyre::eyre!("Chunks vector cannot be empty").into());
164        }
165
166        // The aggregate row anchors deduplication for the whole proof body.
167        let (insertion_id, inserted) = self
168            .insert_citrea_raw_tx_with_hash_status(
169                tx,
170                TransactionKind::Aggregate,
171                None,
172                full_body_hash,
173            )
174            .await?;
175        if !inserted {
176            return Ok(insertion_id);
177        }
178
179        for chunk in chunks {
180            let query = sqlx::query(
181                r#"
182                INSERT INTO tx_sender_citrea_raw_tx_queue (insertion_id, transaction_kind, body, body_hash)
183                VALUES ($1, $2, $3, NULL)
184                "#,
185            )
186            .bind(insertion_id)
187            .bind(TransactionKind::Chunks.as_i16())
188            .bind(chunk.as_slice());
189
190            query.execute(&mut **tx).await?;
191        }
192
193        Ok(insertion_id)
194    }
195
196    /// Returns non-aggregate citrea transactions with null commit_outpoint.
197    /// Excludes aggregate transactions (transaction_kind = 1).
198    pub async fn get_citrea_txs_with_null_commit_outpoint(
199        &self,
200        tx: Option<TxSenderDbTx<'_>>,
201    ) -> Result<Vec<CitreaRawTxRow>, BridgeError> {
202        let query = sqlx::query_as::<_, CitreaRawTxRowDb>(
203                        "SELECT id, insertion_id, transaction_kind, body, commit_outpoint, try_to_send_id, aggregate_finalized
204             FROM tx_sender_citrea_raw_tx_queue
205             WHERE transaction_kind != $1
206               AND commit_outpoint IS NULL
207               AND body IS NOT NULL
208             ORDER BY created_at ASC",
209        )
210        .bind(TransactionKind::Aggregate.as_i16());
211
212        let results: Vec<CitreaRawTxRowDb> =
213            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
214
215        Ok(results.into_iter().map(CitreaRawTxRow::from).collect())
216    }
217
218    /// Returns non-aggregate citrea transactions with commit_outpoint but no try_to_send_id.
219    /// Excludes aggregate transactions (transaction_kind = 1).
220    pub async fn get_citrea_txs_with_commit_outpoint_no_try_to_send(
221        &self,
222        tx: Option<TxSenderDbTx<'_>>,
223    ) -> Result<Vec<CitreaRawTxRow>, BridgeError> {
224        let query = sqlx::query_as::<_, CitreaRawTxRowDb>(
225                        "SELECT id, insertion_id, transaction_kind, body, commit_outpoint, try_to_send_id, aggregate_finalized
226             FROM tx_sender_citrea_raw_tx_queue
227             WHERE transaction_kind != $1
228               AND commit_outpoint IS NOT NULL
229               AND try_to_send_id IS NULL
230               AND body IS NOT NULL
231             ORDER BY created_at ASC",
232        )
233        .bind(TransactionKind::Aggregate.as_i16());
234
235        let results: Vec<CitreaRawTxRowDb> =
236            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
237
238        Ok(results.into_iter().map(CitreaRawTxRow::from).collect())
239    }
240
241    /// Returns citrea transactions with commit_outpoint (regardless of try_to_send_id).
242    pub async fn get_citrea_txs_with_commit_outpoint(
243        &self,
244        tx: Option<TxSenderDbTx<'_>>,
245    ) -> Result<Vec<CitreaRawTxRow>, BridgeError> {
246        let query = sqlx::query_as::<_, CitreaRawTxRowDb>(
247            "SELECT id, insertion_id, transaction_kind, body, commit_outpoint, try_to_send_id, aggregate_finalized
248             FROM tx_sender_citrea_raw_tx_queue
249             WHERE commit_outpoint IS NOT NULL",
250        );
251
252        let results: Vec<CitreaRawTxRowDb> =
253            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
254
255        Ok(results.into_iter().map(CitreaRawTxRow::from).collect())
256    }
257
258    /// Returns Citrea transactions with commit_outpoint where try_to_send_id is
259    /// NULL or the try-to-send tx has not been seen yet.
260    pub async fn get_citrea_txs_with_commit_outpoint_unseen_try_to_send(
261        &self,
262        tx: Option<TxSenderDbTx<'_>>,
263    ) -> Result<Vec<CitreaRawTxRow>, BridgeError> {
264        let query = sqlx::query_as::<_, CitreaRawTxRowDb>(
265                        "SELECT q.id, q.insertion_id, q.transaction_kind, q.body, q.commit_outpoint, q.try_to_send_id, q.aggregate_finalized
266             FROM tx_sender_citrea_raw_tx_queue q
267             LEFT JOIN tx_sender_try_to_send_txs t
268               ON t.id = q.try_to_send_id
269             WHERE q.commit_outpoint IS NOT NULL
270               AND (q.try_to_send_id IS NULL OR t.seen_at_height IS NULL)
271             ORDER BY q.id ASC",
272        );
273
274        let results: Vec<CitreaRawTxRowDb> =
275            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
276
277        Ok(results.into_iter().map(CitreaRawTxRow::from).collect())
278    }
279
280    /// Clears commit_outpoint and try_to_send_id for specific Citrea rows.
281    pub async fn clear_citrea_commit_and_try_to_send_by_ids(
282        &self,
283        tx: Option<TxSenderDbTx<'_>>,
284        ids: &[i64],
285    ) -> Result<(), BridgeError> {
286        if ids.is_empty() {
287            return Ok(());
288        }
289
290        let query = sqlx::query(
291            "UPDATE tx_sender_citrea_raw_tx_queue
292             SET commit_outpoint = NULL, try_to_send_id = NULL
293             WHERE id = ANY($1)",
294        )
295        .bind(ids);
296
297        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
298        Ok(())
299    }
300
301    /// Sets the try_to_send_id for a specific Citrea raw tx row.
302    pub async fn set_citrea_try_to_send_id(
303        &self,
304        tx: TxSenderDbTx<'_>,
305        id: i64,
306        try_to_send_id: i32,
307    ) -> Result<(), BridgeError> {
308        let query = sqlx::query(
309            "UPDATE tx_sender_citrea_raw_tx_queue SET try_to_send_id = $2 WHERE id = $1",
310        )
311        .bind(id)
312        .bind(try_to_send_id);
313
314        query.execute(&mut **tx).await?;
315        Ok(())
316    }
317
318    /// Returns aggregate placeholder rows that are not finalized.
319    pub async fn get_citrea_aggregate_rows_pending(
320        &self,
321        tx: Option<TxSenderDbTx<'_>>,
322    ) -> Result<Vec<CitreaRawTxRow>, BridgeError> {
323        let query = sqlx::query_as::<_, CitreaRawTxRowDb>(
324            "SELECT id, insertion_id, transaction_kind, body, commit_outpoint, try_to_send_id, aggregate_finalized
325             FROM tx_sender_citrea_raw_tx_queue
326             WHERE transaction_kind = $1
327               AND aggregate_finalized = FALSE
328             ORDER BY created_at ASC",
329        )
330        .bind(TransactionKind::Aggregate.as_i16());
331
332        let results: Vec<CitreaRawTxRowDb> =
333            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
334
335        Ok(results.into_iter().map(CitreaRawTxRow::from).collect())
336    }
337
338    /// Returns chunk rows for a given insertion_id, ordered by row id.
339    pub async fn get_citrea_chunk_rows_by_insertion_id(
340        &self,
341        tx: Option<TxSenderDbTx<'_>>,
342        insertion_id: i64,
343    ) -> Result<Vec<CitreaRawTxRow>, BridgeError> {
344        let query = sqlx::query_as::<_, CitreaRawTxRowDb>(
345            "SELECT id, insertion_id, transaction_kind, body, commit_outpoint, try_to_send_id, aggregate_finalized
346             FROM tx_sender_citrea_raw_tx_queue
347             WHERE insertion_id = $1
348               AND transaction_kind = $2
349             ORDER BY id ASC",
350        )
351        .bind(insertion_id)
352        .bind(TransactionKind::Chunks.as_i16());
353
354        let results: Vec<CitreaRawTxRowDb> =
355            txsender_execute_query_with_tx!(&self.pool, tx, query, fetch_all)?;
356
357        Ok(results.into_iter().map(CitreaRawTxRow::from).collect())
358    }
359
360    /// Updates the body for an aggregate Citrea row, resetting commit/try_to_send state.
361    ///
362    /// The `body_hash` remains the original full proof hash rather than being
363    /// recomputed from the aggregate body, so repeated BatchProof submissions
364    /// still dedupe to this insertion group.
365    pub async fn update_citrea_aggregate_body_and_reset(
366        &self,
367        tx: Option<TxSenderDbTx<'_>>,
368        id: i64,
369        body: &[u8],
370    ) -> Result<(), BridgeError> {
371        let query = sqlx::query(
372            "UPDATE tx_sender_citrea_raw_tx_queue
373             SET body = $2,
374                 commit_outpoint = NULL,
375                 try_to_send_id = NULL,
376                 aggregate_finalized = FALSE
377             WHERE id = $1",
378        )
379        .bind(id)
380        .bind(body);
381
382        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
383        Ok(())
384    }
385
386    /// Marks an aggregate row as finalized.
387    pub async fn set_citrea_aggregate_finalized(
388        &self,
389        tx: Option<TxSenderDbTx<'_>>,
390        id: i64,
391    ) -> Result<(), BridgeError> {
392        let query = sqlx::query(
393            "UPDATE tx_sender_citrea_raw_tx_queue
394             SET aggregate_finalized = TRUE
395             WHERE id = $1",
396        )
397        .bind(id);
398
399        txsender_execute_query_with_tx!(&self.pool, tx, query, execute)?;
400        Ok(())
401    }
402}