clementine_core/database/
bitcoin_syncer.rs

1use super::{
2    wrapper::{BlockHashDB, TxidDB},
3    Database, DatabaseTransaction,
4};
5use crate::{
6    bitcoin_syncer::BitcoinSyncerEvent, config::protocol::ProtocolParamset, execute_query_with_tx,
7};
8use bitcoin::{BlockHash, OutPoint, Txid};
9use clementine_errors::BridgeError;
10use eyre::Context;
11use std::ops::DerefMut;
12
13impl Database {
14    /// # Returns
15    ///
16    /// - [`u32`]: Database entry id, later to be used while referring block
17    pub async fn insert_block_info(
18        &self,
19        tx: Option<DatabaseTransaction<'_>>,
20        block_hash: &BlockHash,
21        prev_block_hash: &BlockHash,
22        block_height: u32,
23    ) -> Result<u32, BridgeError> {
24        let query = sqlx::query_scalar(
25            "INSERT INTO bitcoin_syncer (blockhash, prev_blockhash, height) VALUES ($1, $2, $3) RETURNING id",
26        )
27        .bind(BlockHashDB(*block_hash))
28        .bind(BlockHashDB(*prev_block_hash))
29        .bind(i32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?);
30
31        let id: i32 = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
32
33        u32::try_from(id)
34            .wrap_err(BridgeError::IntConversionError)
35            .map_err(Into::into)
36    }
37
38    /// Sets the block with given block hash as canonical if it exists in the database
39    /// Returns the block id if the block was found and set as canonical, None otherwise
40    pub async fn update_block_as_canonical(
41        &self,
42        tx: Option<DatabaseTransaction<'_>>,
43        block_hash: BlockHash,
44    ) -> Result<Option<u32>, BridgeError> {
45        let query = sqlx::query_scalar(
46            "UPDATE bitcoin_syncer SET is_canonical = true WHERE blockhash = $1 RETURNING id",
47        )
48        .bind(BlockHashDB(block_hash));
49
50        let id: Option<i32> = execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
51
52        id.map(|id| u32::try_from(id).wrap_err(BridgeError::IntConversionError))
53            .transpose()
54            .map_err(Into::into)
55    }
56
57    /// # Returns
58    ///
59    /// [`Some`] if the block exists in the database, [`None`] otherwise:
60    ///
61    /// - [`BlockHash`]: Previous block hash
62    /// - [`u32`]: Height of the block
63    pub async fn get_block_info_from_hash(
64        &self,
65        tx: Option<DatabaseTransaction<'_>>,
66        block_hash: BlockHash,
67    ) -> Result<Option<(BlockHash, u32)>, BridgeError> {
68        let query = sqlx::query_as(
69            "SELECT prev_blockhash, height FROM bitcoin_syncer WHERE blockhash = $1 AND is_canonical = true",
70        )
71        .bind(BlockHashDB(block_hash));
72
73        let ret: Option<(BlockHashDB, i32)> =
74            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
75
76        ret.map(
77            |(prev_hash, height)| -> Result<(BlockHash, u32), BridgeError> {
78                let height = u32::try_from(height).wrap_err(BridgeError::IntConversionError)?;
79                Ok((prev_hash.0, height))
80            },
81        )
82        .transpose()
83    }
84
85    /// Gets block hash and height from block id (internal id used in bitcoin_syncer)
86    pub async fn get_block_info_from_id(
87        &self,
88        tx: Option<DatabaseTransaction<'_>>,
89        block_id: u32,
90    ) -> Result<Option<(BlockHash, u32)>, BridgeError> {
91        let query = sqlx::query_as("SELECT blockhash, height FROM bitcoin_syncer WHERE id = $1")
92            .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?);
93
94        let ret: Option<(BlockHashDB, i32)> =
95            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
96
97        ret.map(
98            |(block_hash, height)| -> Result<(BlockHash, u32), BridgeError> {
99                let height = u32::try_from(height).wrap_err(BridgeError::IntConversionError)?;
100                Ok((block_hash.0, height))
101            },
102        )
103        .transpose()
104    }
105
106    /// Stores the full block in bytes in the database, with its height and hash
107    pub async fn upsert_full_block(
108        &self,
109        tx: Option<DatabaseTransaction<'_>>,
110        block: &bitcoin::Block,
111        block_height: u32,
112    ) -> Result<(), BridgeError> {
113        let block_bytes = bitcoin::consensus::serialize(block);
114        let query = sqlx::query(
115            "INSERT INTO bitcoin_blocks (height, block_data, block_hash) VALUES ($1, $2, $3)
116             ON CONFLICT (height) DO UPDATE SET block_data = $2, block_hash = $3",
117        )
118        .bind(i32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?)
119        .bind(&block_bytes)
120        .bind(BlockHashDB(block.header.block_hash()));
121
122        execute_query_with_tx!(self.connection, tx, query, execute)?;
123        Ok(())
124    }
125
126    /// Gets the full block from the database, given the block height
127    pub async fn get_full_block(
128        &self,
129        tx: Option<DatabaseTransaction<'_>>,
130        block_height: u32,
131    ) -> Result<Option<bitcoin::Block>, BridgeError> {
132        let query = sqlx::query_as("SELECT block_data FROM bitcoin_blocks WHERE height = $1")
133            .bind(i32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?);
134
135        let block_data: Option<(Vec<u8>,)> =
136            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
137
138        match block_data {
139            Some((bytes,)) => {
140                let block = bitcoin::consensus::deserialize(&bytes)
141                    .wrap_err(BridgeError::IntConversionError)?;
142                Ok(Some(block))
143            }
144            None => Ok(None),
145        }
146    }
147
148    /// Gets the full block and its height from the database, given the block hash
149    pub async fn get_full_block_from_hash(
150        &self,
151        tx: Option<DatabaseTransaction<'_>>,
152        block_hash: BlockHash,
153    ) -> Result<Option<(u32, bitcoin::Block)>, BridgeError> {
154        let query =
155            sqlx::query_as("SELECT height, block_data FROM bitcoin_blocks WHERE block_hash = $1")
156                .bind(BlockHashDB(block_hash));
157
158        let block_data: Option<(i32, Vec<u8>)> =
159            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
160
161        match block_data {
162            Some((height_i32, bytes)) => {
163                let height = u32::try_from(height_i32).wrap_err(BridgeError::IntConversionError)?;
164                let block = bitcoin::consensus::deserialize(&bytes)
165                    .wrap_err(BridgeError::IntConversionError)?;
166                Ok(Some((height, block)))
167            }
168            None => Ok(None),
169        }
170    }
171
172    /// Gets the maximum height of the canonical blocks in the bitcoin_syncer database
173    pub async fn get_max_height(
174        &self,
175        tx: Option<DatabaseTransaction<'_>>,
176    ) -> Result<Option<u32>, BridgeError> {
177        let query =
178            sqlx::query_as("SELECT height FROM bitcoin_syncer WHERE is_canonical = true ORDER BY height DESC LIMIT 1");
179        let result: Option<(i32,)> =
180            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
181
182        result
183            .map(|(height,)| u32::try_from(height).wrap_err(BridgeError::IntConversionError))
184            .transpose()
185            .map_err(Into::into)
186    }
187
188    /// Gets the block hashes that have height bigger then the given height and deletes them.
189    /// Marks blocks with height bigger than the given height as non-canonical.
190    ///
191    /// # Parameters
192    ///
193    /// - `tx`: Optional transaction to use for the query.
194    /// - `height`: Height to start marking blocks as such (not inclusive).
195    ///
196    /// # Returns
197    ///
198    /// - [`Vec<u32>`]: List of block ids that were marked as non-canonical in
199    ///   descending order.
200    pub async fn update_non_canonical_block_hashes(
201        &self,
202        tx: Option<DatabaseTransaction<'_>>,
203        height: u32,
204    ) -> Result<Vec<u32>, BridgeError> {
205        let query = sqlx::query_as(
206            "WITH deleted AS (
207                UPDATE bitcoin_syncer
208                SET is_canonical = false
209                WHERE height > $1
210                RETURNING id
211            ) SELECT id FROM deleted ORDER BY id DESC",
212        )
213        .bind(i32::try_from(height).wrap_err(BridgeError::IntConversionError)?);
214
215        let block_ids: Vec<(i32,)> = execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
216        block_ids
217            .into_iter()
218            .map(|(block_id,)| u32::try_from(block_id).wrap_err(BridgeError::IntConversionError))
219            .collect::<Result<Vec<_>, eyre::Report>>()
220            .map_err(Into::into)
221    }
222
223    /// Gets the block id of the canonical block at the given height
224    pub async fn get_canonical_block_id_from_height(
225        &self,
226        tx: Option<DatabaseTransaction<'_>>,
227        height: u32,
228    ) -> Result<Option<u32>, BridgeError> {
229        let query = sqlx::query_as(
230            "SELECT id FROM bitcoin_syncer WHERE height = $1 AND is_canonical = true",
231        )
232        .bind(i32::try_from(height).wrap_err(BridgeError::IntConversionError)?);
233
234        let block_id: Option<(i32,)> =
235            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
236
237        block_id
238            .map(|(block_id,)| u32::try_from(block_id).wrap_err(BridgeError::IntConversionError))
239            .transpose()
240            .map_err(Into::into)
241    }
242
243    /// Saves the txid with the id of the block that contains it to the database
244    pub async fn insert_txid_to_block(
245        &self,
246        tx: DatabaseTransaction<'_>,
247        block_id: u32,
248        txid: &bitcoin::Txid,
249    ) -> Result<(), BridgeError> {
250        let query = sqlx::query("INSERT INTO bitcoin_syncer_txs (block_id, txid) VALUES ($1, $2)")
251            .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?)
252            .bind(super::wrapper::TxidDB(*txid));
253
254        execute_query_with_tx!(self.connection, Some(tx), query, execute)?;
255
256        Ok(())
257    }
258
259    /// Gets all the txids that are contained in the block with the given id
260    #[cfg(test)]
261    pub async fn get_block_txids(
262        &self,
263        tx: Option<DatabaseTransaction<'_>>,
264        block_id: u32,
265    ) -> Result<Vec<Txid>, BridgeError> {
266        let query = sqlx::query_as("SELECT txid FROM bitcoin_syncer_txs WHERE block_id = $1")
267            .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?);
268
269        let txids: Vec<(TxidDB,)> = execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
270
271        Ok(txids.into_iter().map(|(txid,)| txid.0).collect())
272    }
273
274    /// Gets the block height for txids that exist in canonical blocks.
275    /// Returns a mapping of txid -> block_height for those that exist.
276    pub async fn get_canonical_block_heights_for_txids(
277        &self,
278        tx: Option<DatabaseTransaction<'_>>,
279        txids: &[Txid],
280    ) -> Result<Vec<(Txid, u32)>, BridgeError> {
281        if txids.is_empty() {
282            return Ok(Vec::new());
283        }
284
285        // Convert txids to TxidDB for array binding
286        let txid_params: Vec<TxidDB> = txids.iter().map(|t| TxidDB(*t)).collect();
287
288        // Use TxidDB for result decoding to be consistent with the rest of the codebase
289        let query = sqlx::query_as::<_, (TxidDB, i32)>(
290            "SELECT bst.txid, bs.height
291             FROM bitcoin_syncer_txs bst
292             INNER JOIN bitcoin_syncer bs ON bst.block_id = bs.id
293             WHERE bst.txid = ANY($1) AND bs.is_canonical = true",
294        )
295        .bind(&txid_params);
296
297        let results: Vec<(TxidDB, i32)> =
298            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
299
300        results
301            .into_iter()
302            .map(|(txid, height)| {
303                let height =
304                    u32::try_from(height).wrap_err("Failed to convert block height to u32")?;
305                Ok((txid.0, height))
306            })
307            .collect()
308    }
309
310    /// Checks if a txid exists in a canonical block.
311    /// Returns Some(block_height) if found, None otherwise.
312    pub async fn get_canonical_block_height_for_txid(
313        &self,
314        tx: Option<DatabaseTransaction<'_>>,
315        txid: Txid,
316    ) -> Result<Option<u32>, BridgeError> {
317        let query = sqlx::query_scalar::<_, i32>(
318            "SELECT bs.height
319             FROM bitcoin_syncer_txs bst
320             INNER JOIN bitcoin_syncer bs ON bst.block_id = bs.id
321             WHERE bst.txid = $1 AND bs.is_canonical = true",
322        )
323        .bind(TxidDB(txid));
324
325        let result: Option<i32> =
326            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
327
328        result
329            .map(|height| u32::try_from(height).wrap_err("Failed to convert block height to u32"))
330            .transpose()
331            .map_err(Into::into)
332    }
333
334    /// Inserts a spent utxo into the database, with the block id that contains it, the spending txid and the vout
335    pub async fn insert_spent_utxo(
336        &self,
337        tx: DatabaseTransaction<'_>,
338        block_id: u32,
339        spending_txid: &bitcoin::Txid,
340        txid: &bitcoin::Txid,
341        vout: i64,
342    ) -> Result<(), BridgeError> {
343        sqlx::query(
344            "INSERT INTO bitcoin_syncer_spent_utxos (block_id, spending_txid, txid, vout) VALUES ($1, $2, $3, $4)",
345        )
346        .bind(block_id as i32)
347        .bind(super::wrapper::TxidDB(*spending_txid))
348        .bind(super::wrapper::TxidDB(*txid))
349        .bind(vout)
350        .execute(tx.deref_mut())
351        .await?;
352        Ok(())
353    }
354
355    /// For a given outpoint, gets the block height of the canonical block that spent it.
356    /// Returns None if the outpoint is not spent.
357    pub async fn get_block_height_of_spending_txid(
358        &self,
359        tx: Option<DatabaseTransaction<'_>>,
360        outpoint: OutPoint,
361    ) -> Result<Option<u32>, BridgeError> {
362        let query = sqlx::query_scalar::<_, i32>(
363            "SELECT bs.height FROM bitcoin_syncer_spent_utxos bspu
364                INNER JOIN bitcoin_syncer bs ON bspu.block_id = bs.id
365                WHERE bspu.txid = $1 AND bspu.vout = $2 AND bs.is_canonical = true",
366        )
367        .bind(super::wrapper::TxidDB(outpoint.txid))
368        .bind(outpoint.vout as i64);
369
370        let result: Option<i32> =
371            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
372
373        result
374            .map(|height| u32::try_from(height).wrap_err(BridgeError::IntConversionError))
375            .transpose()
376            .map_err(Into::into)
377    }
378
379    /// Checks if the utxo is spent, if so checks if the spending tx is finalized
380    /// Returns true if the utxo is spent and the spending tx is finalized, false otherwise
381    pub async fn check_if_utxo_spending_tx_is_finalized(
382        &self,
383        tx: Option<DatabaseTransaction<'_>>,
384        outpoint: OutPoint,
385        current_chain_height: u32,
386        protocol_paramset: &'static ProtocolParamset,
387    ) -> Result<bool, BridgeError> {
388        let spending_tx_height = self.get_block_height_of_spending_txid(tx, outpoint).await?;
389        match spending_tx_height {
390            Some(spending_tx_height) => {
391                Ok(protocol_paramset.is_block_finalized(spending_tx_height, current_chain_height))
392            }
393            None => Ok(false),
394        }
395    }
396
397    /// Gets all the spent utxos for a given txid
398    pub async fn get_spent_utxos_for_txid(
399        &self,
400        tx: Option<DatabaseTransaction<'_>>,
401        txid: Txid,
402    ) -> Result<Vec<(i64, OutPoint)>, BridgeError> {
403        let query = sqlx::query_as(
404            "SELECT block_id, txid, vout FROM bitcoin_syncer_spent_utxos WHERE spending_txid = $1",
405        )
406        .bind(TxidDB(txid));
407
408        let spent_utxos: Vec<(i64, TxidDB, i64)> =
409            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
410
411        spent_utxos
412            .into_iter()
413            .map(
414                |(block_id, txid, vout)| -> Result<(i64, OutPoint), BridgeError> {
415                    let vout = u32::try_from(vout).wrap_err(BridgeError::IntConversionError)?;
416                    Ok((block_id, OutPoint { txid: txid.0, vout }))
417                },
418            )
419            .collect::<Result<Vec<_>, BridgeError>>()
420    }
421
422    /// Adds a bitcoin syncer event to the database. These events can currently be new block or reorged block.
423    pub async fn insert_event(
424        &self,
425        tx: Option<DatabaseTransaction<'_>>,
426        event_type: BitcoinSyncerEvent,
427    ) -> Result<(), BridgeError> {
428        let query = match event_type {
429            BitcoinSyncerEvent::NewBlock(block_id) => sqlx::query(
430                "INSERT INTO bitcoin_syncer_events (block_id, event_type) VALUES ($1, 'new_block'::bitcoin_syncer_event_type)",
431            )
432            .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?),
433            BitcoinSyncerEvent::ReorgedBlock(block_id) => sqlx::query(
434                "INSERT INTO bitcoin_syncer_events (block_id, event_type) VALUES ($1, 'reorged_block'::bitcoin_syncer_event_type)",
435            )
436            .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?),
437        };
438        execute_query_with_tx!(self.connection, tx, query, execute)?;
439        Ok(())
440    }
441
442    /// Returns the last processed Bitcoin Syncer event's block height for given consumer.
443    /// If the last processed event is missing, i.e. there are no processed events for the consumer, returns `None`.
444    pub async fn get_last_processed_event_block_height(
445        &self,
446        tx: Option<DatabaseTransaction<'_>>,
447        consumer_handle: &str,
448    ) -> Result<Option<u32>, BridgeError> {
449        let query = sqlx::query_scalar::<_, i32>(
450            r#"SELECT bs.height
451             FROM bitcoin_syncer_event_handlers bseh
452             INNER JOIN bitcoin_syncer_events bse ON bseh.last_processed_event_id = bse.id
453             INNER JOIN bitcoin_syncer bs ON bse.block_id = bs.id
454             WHERE bseh.consumer_handle = $1"#,
455        )
456        .bind(consumer_handle);
457
458        let result: Option<i32> =
459            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
460
461        result
462            .map(|h| {
463                u32::try_from(h)
464                    .wrap_err(BridgeError::IntConversionError)
465                    .map_err(BridgeError::from)
466            })
467            .transpose()
468    }
469
470    /// Gets the last processed event id for a given consumer
471    pub async fn get_last_processed_event_id(
472        &self,
473        tx: DatabaseTransaction<'_>,
474        consumer_handle: &str,
475    ) -> Result<i32, BridgeError> {
476        // Step 1: Insert the consumer_handle if it doesn't exist
477        sqlx::query(
478            r#"
479            INSERT INTO bitcoin_syncer_event_handlers (consumer_handle, last_processed_event_id)
480            VALUES ($1, 0)
481            ON CONFLICT (consumer_handle) DO NOTHING
482            "#,
483        )
484        .bind(consumer_handle)
485        .execute(tx.deref_mut())
486        .await?;
487
488        // Step 2: Get the last processed event ID for this consumer
489        let last_processed_event_id: i32 = sqlx::query_scalar(
490            r#"
491            SELECT last_processed_event_id
492            FROM bitcoin_syncer_event_handlers
493            WHERE consumer_handle = $1
494            "#,
495        )
496        .bind(consumer_handle)
497        .fetch_one(tx.deref_mut())
498        .await?;
499
500        Ok(last_processed_event_id)
501    }
502
503    /// Returns the maximum block height of the blocks that have been processed by the given consumer.
504    /// If the last processed event is missing, i.e. there are no processed events for the consumer, returns `None`.
505    pub async fn get_max_processed_block_height(
506        &self,
507        tx: Option<DatabaseTransaction<'_>>,
508        consumer_handle: &str,
509    ) -> Result<Option<u32>, BridgeError> {
510        let query = sqlx::query_scalar::<_, Option<i32>>(
511            r#"SELECT MAX(bs.height)
512             FROM bitcoin_syncer_events bse
513             INNER JOIN bitcoin_syncer bs ON bse.block_id = bs.id
514             WHERE bse.id <= (
515                 SELECT last_processed_event_id
516                 FROM bitcoin_syncer_event_handlers
517                 WHERE consumer_handle = $1
518             )"#,
519        )
520        .bind(consumer_handle);
521
522        let result: Option<i32> = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
523
524        result
525            .map(|h| {
526                u32::try_from(h)
527                    .wrap_err(BridgeError::IntConversionError)
528                    .map_err(BridgeError::from)
529            })
530            .transpose()
531    }
532
533    /// Returns the next finalized block height that should be processed by the given consumer.
534    /// If there are no processed events, returns the paramset start height.
535    /// Next height is the max height of the processed block - finality depth + 1.
536    pub async fn get_next_finalized_block_height_for_consumer(
537        &self,
538        tx: Option<DatabaseTransaction<'_>>,
539        consumer_handle: &str,
540        paramset: &'static ProtocolParamset,
541    ) -> Result<u32, BridgeError> {
542        let max_processed_block_height = self
543            .get_max_processed_block_height(tx, consumer_handle)
544            .await?;
545
546        let max_processed_finalized_block_height = match max_processed_block_height {
547            Some(max_processed_block_height) => {
548                max_processed_block_height.checked_sub(paramset.finality_depth - 1)
549            }
550            None => None,
551        };
552
553        let next_height = max_processed_finalized_block_height
554            .map(|h| h + 1)
555            .unwrap_or(paramset.start_height);
556
557        Ok(std::cmp::max(next_height, paramset.start_height))
558    }
559
560    /// Fetches the next bitcoin syncer event for a given consumer
561    /// This function is used to fetch the next event that hasn't been processed yet
562    /// It will return the event which includes the event type and the block id
563    /// The last updated event id is also updated to the id that is returned
564    /// If there are no more events to fetch, None is returned
565    pub async fn fetch_next_bitcoin_syncer_evt(
566        &self,
567        tx: DatabaseTransaction<'_>,
568        consumer_handle: &str,
569    ) -> Result<Option<BitcoinSyncerEvent>, BridgeError> {
570        // Get the last processed event ID for this consumer
571        let last_processed_event_id = self
572            .get_last_processed_event_id(tx, consumer_handle)
573            .await?;
574
575        // Retrieve the next event that hasn't been processed yet
576        let event = sqlx::query_as::<_, (i32, i32, String)>(
577            r#"
578            SELECT id, block_id, event_type::text
579            FROM bitcoin_syncer_events
580            WHERE id > $1
581            ORDER BY id ASC
582            LIMIT 1
583            "#,
584        )
585        .bind(last_processed_event_id)
586        .fetch_optional(tx.deref_mut())
587        .await?;
588
589        if event.is_none() {
590            return Ok(None);
591        }
592
593        let event = event.expect("should exist since we checked is_none()");
594        let event_id = event.0;
595        let event_type: BitcoinSyncerEvent = (event.2, event.1).try_into()?;
596
597        // Update last_processed_event_id for this consumer
598        sqlx::query(
599            r#"
600            UPDATE bitcoin_syncer_event_handlers
601            SET last_processed_event_id = $1
602            WHERE consumer_handle = $2
603            "#,
604        )
605        .bind(event_id)
606        .bind(consumer_handle)
607        .execute(tx.deref_mut())
608        .await?;
609
610        Ok(Some(event_type))
611    }
612}
613
614#[cfg(test)]
615mod tests {
616    use super::*;
617    use crate::database::Database;
618    use crate::test::common::*;
619    use bitcoin::hashes::Hash;
620    use bitcoin::{BlockHash, CompactTarget};
621
622    async fn setup_test_db() -> Database {
623        let config = create_test_config_with_thread_name().await;
624        Database::new(&config).await.unwrap()
625    }
626
627    #[tokio::test]
628    async fn test_event_handling() {
629        let db = setup_test_db().await;
630        let mut dbtx = db.begin_transaction().await.unwrap();
631
632        // Create a test block
633        let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
634        let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
635        let height = 0x45;
636
637        let block_id = db
638            .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
639            .await
640            .unwrap();
641
642        // Add new block event
643        db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
644            .await
645            .unwrap();
646
647        // Test event consumption
648        let consumer_handle = "test_consumer";
649        let event = db
650            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer_handle)
651            .await
652            .unwrap();
653
654        assert!(matches!(event, Some(BitcoinSyncerEvent::NewBlock(id)) if id == block_id));
655
656        // Test that the same event is not returned twice
657        let event = db
658            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer_handle)
659            .await
660            .unwrap();
661        assert!(event.is_none());
662
663        // Add reorg event
664        db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::ReorgedBlock(block_id))
665            .await
666            .unwrap();
667
668        // Test that new event is received
669        let event = db
670            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer_handle)
671            .await
672            .unwrap();
673        assert!(matches!(event, Some(BitcoinSyncerEvent::ReorgedBlock(id)) if id == block_id));
674
675        dbtx.commit().await.unwrap();
676    }
677
678    #[tokio::test]
679    async fn test_store_and_get_block() {
680        let db = setup_test_db().await;
681        let block_height = 123u32;
682
683        // Create a dummy block
684        let dummy_header = bitcoin::block::Header {
685            version: bitcoin::block::Version::TWO,
686            prev_blockhash: BlockHash::from_raw_hash(Hash::from_byte_array([0x42; 32])),
687            merkle_root: bitcoin::TxMerkleNode::all_zeros(),
688            time: 1_000_000,
689            bits: CompactTarget::from_consensus(0),
690            nonce: 12345,
691        };
692
693        let dummy_txs = vec![bitcoin::Transaction {
694            version: bitcoin::blockdata::transaction::Version::TWO,
695            lock_time: bitcoin::absolute::LockTime::ZERO,
696            input: vec![],
697            output: vec![],
698        }];
699
700        let dummy_block = bitcoin::Block {
701            header: dummy_header,
702            txdata: dummy_txs.clone(),
703        };
704
705        let dummy_block_hash = dummy_block.block_hash();
706
707        // Store the block
708        db.upsert_full_block(None, &dummy_block, block_height)
709            .await
710            .unwrap();
711
712        // Retrieve the block
713        let retrieved_block = db
714            .get_full_block(None, block_height)
715            .await
716            .unwrap()
717            .unwrap();
718
719        // Verify block fields match
720        assert_eq!(retrieved_block, dummy_block);
721
722        // Retrieve the block
723        let retrieved_block_from_hash = db
724            .get_full_block_from_hash(None, dummy_block_hash)
725            .await
726            .unwrap()
727            .unwrap()
728            .1;
729
730        // Verify block fields match
731        assert_eq!(retrieved_block_from_hash, dummy_block);
732
733        // Non-existent block should return None
734        assert!(db.get_full_block(None, 999).await.unwrap().is_none());
735
736        // Overwrite the block
737        let updated_dummy_header = bitcoin::block::Header {
738            version: bitcoin::block::Version::ONE, // Changed version
739            ..dummy_header
740        };
741        let updated_dummy_block = bitcoin::Block {
742            header: updated_dummy_header,
743            txdata: dummy_txs.clone(),
744        };
745
746        let updated_dummy_block_hash = updated_dummy_block.block_hash();
747
748        db.upsert_full_block(None, &updated_dummy_block, block_height)
749            .await
750            .unwrap();
751
752        // Verify the update worked
753        let retrieved_updated_block = db
754            .get_full_block(None, block_height)
755            .await
756            .unwrap()
757            .unwrap();
758        assert_eq!(updated_dummy_block, retrieved_updated_block);
759
760        let retrieved_updated_block_from_hash = db
761            .get_full_block_from_hash(None, updated_dummy_block_hash)
762            .await
763            .unwrap()
764            .unwrap()
765            .1;
766        assert_eq!(updated_dummy_block, retrieved_updated_block_from_hash);
767    }
768
769    #[tokio::test]
770    async fn test_multiple_event_consumers() {
771        let db = setup_test_db().await;
772        let mut dbtx = db.begin_transaction().await.unwrap();
773
774        // Create a test block
775        let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
776        let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
777        let height = 0x45;
778
779        let block_id = db
780            .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
781            .await
782            .unwrap();
783
784        // Add events
785        db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
786            .await
787            .unwrap();
788        db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::ReorgedBlock(block_id))
789            .await
790            .unwrap();
791
792        // Test with multiple consumers
793        let consumer1 = "consumer1";
794        let consumer2 = "consumer2";
795
796        // First consumer gets both events in order
797        let event1 = db
798            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer1)
799            .await
800            .unwrap();
801        assert!(matches!(event1, Some(BitcoinSyncerEvent::NewBlock(id)) if id == block_id));
802
803        let event2 = db
804            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer1)
805            .await
806            .unwrap();
807        assert!(matches!(event2, Some(BitcoinSyncerEvent::ReorgedBlock(id)) if id == block_id));
808
809        // Second consumer also gets both events independently
810        let event1 = db
811            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer2)
812            .await
813            .unwrap();
814        assert!(matches!(event1, Some(BitcoinSyncerEvent::NewBlock(id)) if id == block_id));
815
816        let event2 = db
817            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer2)
818            .await
819            .unwrap();
820        assert!(matches!(event2, Some(BitcoinSyncerEvent::ReorgedBlock(id)) if id == block_id));
821
822        dbtx.commit().await.unwrap();
823    }
824
825    #[tokio::test]
826    async fn test_non_canonical_blocks() {
827        let db = setup_test_db().await;
828        let mut dbtx = db.begin_transaction().await.unwrap();
829
830        // Create a chain of blocks
831        let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
832        let heights = [1, 2, 3, 4, 5];
833        let mut last_hash = prev_block_hash;
834
835        // Save some initial blocks.
836        let mut block_ids = Vec::new();
837        for height in heights {
838            let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([height as u8; 32]));
839            let block_id = db
840                .insert_block_info(Some(&mut dbtx), &block_hash, &last_hash, height)
841                .await
842                .unwrap();
843            block_ids.push(block_id);
844            last_hash = block_hash;
845        }
846
847        // Mark blocks above height 2 as non-canonical.
848        let non_canonical_blocks = db
849            .update_non_canonical_block_hashes(Some(&mut dbtx), 2)
850            .await
851            .unwrap();
852        assert_eq!(non_canonical_blocks.len(), 3);
853        assert_eq!(non_canonical_blocks, vec![5, 4, 3]);
854
855        // Verify blocks above height 2 are not returned
856        for height in heights {
857            let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([height as u8; 32]));
858            let block_info = db
859                .get_block_info_from_hash(Some(&mut dbtx), block_hash)
860                .await
861                .unwrap();
862
863            if height <= 2 {
864                assert!(block_info.is_some());
865            } else {
866                assert!(block_info.is_none());
867            }
868        }
869
870        // Verify max height is now 2
871        let max_height = db.get_max_height(Some(&mut dbtx)).await.unwrap().unwrap();
872        assert_eq!(max_height, 2);
873
874        dbtx.commit().await.unwrap();
875    }
876
877    #[tokio::test]
878    async fn add_get_block_info() {
879        let config = create_test_config_with_thread_name().await;
880        let db = Database::new(&config).await.unwrap();
881
882        let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
883        let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
884        let height = 0x45;
885
886        assert!(db
887            .get_block_info_from_hash(None, block_hash)
888            .await
889            .unwrap()
890            .is_none());
891
892        db.insert_block_info(None, &block_hash, &prev_block_hash, height)
893            .await
894            .unwrap();
895        let block_info = db
896            .get_block_info_from_hash(None, block_hash)
897            .await
898            .unwrap()
899            .unwrap();
900        let max_height = db.get_max_height(None).await.unwrap().unwrap();
901        assert_eq!(block_info.0, prev_block_hash);
902        assert_eq!(block_info.1, height);
903        assert_eq!(max_height, height);
904
905        db.insert_block_info(
906            None,
907            &BlockHash::from_raw_hash(Hash::from_byte_array([0x1; 32])),
908            &prev_block_hash,
909            height - 1,
910        )
911        .await
912        .unwrap();
913        let max_height = db.get_max_height(None).await.unwrap().unwrap();
914        assert_eq!(max_height, height);
915
916        db.insert_block_info(
917            None,
918            &BlockHash::from_raw_hash(Hash::from_byte_array([0x2; 32])),
919            &prev_block_hash,
920            height + 1,
921        )
922        .await
923        .unwrap();
924        let max_height = db.get_max_height(None).await.unwrap().unwrap();
925        assert_ne!(max_height, height);
926        assert_eq!(max_height, height + 1);
927    }
928
929    #[tokio::test]
930    async fn add_and_get_txids_from_block() {
931        let config = create_test_config_with_thread_name().await;
932        let db = Database::new(&config).await.unwrap();
933        let mut dbtx = db.begin_transaction().await.unwrap();
934
935        assert!(db
936            .insert_txid_to_block(&mut dbtx, 0, &Txid::all_zeros())
937            .await
938            .is_err());
939        let mut dbtx = db.begin_transaction().await.unwrap();
940
941        let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
942        let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
943        let height = 0x45;
944        let block_id = db
945            .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
946            .await
947            .unwrap();
948
949        let txids = vec![
950            Txid::from_raw_hash(Hash::from_byte_array([0x1; 32])),
951            Txid::from_raw_hash(Hash::from_byte_array([0x2; 32])),
952            Txid::from_raw_hash(Hash::from_byte_array([0x3; 32])),
953        ];
954        for txid in &txids {
955            db.insert_txid_to_block(&mut dbtx, block_id, txid)
956                .await
957                .unwrap();
958        }
959
960        let txids_from_db = db.get_block_txids(Some(&mut dbtx), block_id).await.unwrap();
961        assert_eq!(txids_from_db, txids);
962
963        assert!(db
964            .get_block_txids(Some(&mut dbtx), block_id + 1)
965            .await
966            .unwrap()
967            .is_empty());
968
969        dbtx.commit().await.unwrap();
970    }
971
972    #[tokio::test]
973    async fn insert_get_spent_utxos() {
974        let config = create_test_config_with_thread_name().await;
975        let db = Database::new(&config).await.unwrap();
976        let mut dbtx = db.begin_transaction().await.unwrap();
977
978        let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
979        let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
980        let height = 0x45;
981        let block_id = db
982            .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
983            .await
984            .unwrap();
985
986        let spending_txid = Txid::from_raw_hash(Hash::from_byte_array([0x2; 32]));
987        let txid = Txid::from_raw_hash(Hash::from_byte_array([0x1; 32]));
988        let vout = 0;
989        db.insert_txid_to_block(&mut dbtx, block_id, &spending_txid)
990            .await
991            .unwrap();
992
993        assert_eq!(
994            db.get_spent_utxos_for_txid(Some(&mut dbtx), txid)
995                .await
996                .unwrap()
997                .len(),
998            0
999        );
1000
1001        db.insert_spent_utxo(&mut dbtx, block_id, &spending_txid, &txid, vout)
1002            .await
1003            .unwrap();
1004
1005        let spent_utxos = db
1006            .get_spent_utxos_for_txid(Some(&mut dbtx), spending_txid)
1007            .await
1008            .unwrap();
1009        assert_eq!(spent_utxos.len(), 1);
1010        assert_eq!(spent_utxos[0].0, block_id as i64);
1011        assert_eq!(
1012            spent_utxos[0].1,
1013            bitcoin::OutPoint {
1014                txid,
1015                vout: vout as u32,
1016            }
1017        );
1018
1019        dbtx.commit().await.unwrap();
1020    }
1021}