clementine_core/database/
bitcoin_syncer.rs

1use super::{
2    wrapper::{BlockHashDB, TxidDB},
3    Database, DatabaseTransaction,
4};
5use crate::{
6    bitcoin_syncer::BitcoinSyncerEvent, config::protocol::ProtocolParamset, errors::BridgeError,
7    execute_query_with_tx,
8};
9use bitcoin::{BlockHash, OutPoint, Txid};
10use eyre::Context;
11use std::ops::DerefMut;
12
13impl Database {
14    /// # Returns
15    ///
16    /// - [`u32`]: Database entry id, later to be used while referring block
17    pub async fn insert_block_info(
18        &self,
19        tx: Option<DatabaseTransaction<'_, '_>>,
20        block_hash: &BlockHash,
21        prev_block_hash: &BlockHash,
22        block_height: u32,
23    ) -> Result<u32, BridgeError> {
24        let query = sqlx::query_scalar(
25            "INSERT INTO bitcoin_syncer (blockhash, prev_blockhash, height) VALUES ($1, $2, $3) RETURNING id",
26        )
27        .bind(BlockHashDB(*block_hash))
28        .bind(BlockHashDB(*prev_block_hash))
29        .bind(i32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?);
30
31        let id: i32 = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
32
33        u32::try_from(id)
34            .wrap_err(BridgeError::IntConversionError)
35            .map_err(Into::into)
36    }
37
38    /// Sets the block with given block hash as canonical if it exists in the database
39    /// Returns the block id if the block was found and set as canonical, None otherwise
40    pub async fn update_block_as_canonical(
41        &self,
42        tx: Option<DatabaseTransaction<'_, '_>>,
43        block_hash: BlockHash,
44    ) -> Result<Option<u32>, BridgeError> {
45        let query = sqlx::query_scalar(
46            "UPDATE bitcoin_syncer SET is_canonical = true WHERE blockhash = $1 RETURNING id",
47        )
48        .bind(BlockHashDB(block_hash));
49
50        let id: Option<i32> = execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
51
52        id.map(|id| u32::try_from(id).wrap_err(BridgeError::IntConversionError))
53            .transpose()
54            .map_err(Into::into)
55    }
56
57    /// # Returns
58    ///
59    /// [`Some`] if the block exists in the database, [`None`] otherwise:
60    ///
61    /// - [`BlockHash`]: Previous block hash
62    /// - [`u32`]: Height of the block
63    pub async fn get_block_info_from_hash(
64        &self,
65        tx: Option<DatabaseTransaction<'_, '_>>,
66        block_hash: BlockHash,
67    ) -> Result<Option<(BlockHash, u32)>, BridgeError> {
68        let query = sqlx::query_as(
69            "SELECT prev_blockhash, height FROM bitcoin_syncer WHERE blockhash = $1 AND is_canonical = true",
70        )
71        .bind(BlockHashDB(block_hash));
72
73        let ret: Option<(BlockHashDB, i32)> =
74            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
75
76        ret.map(
77            |(prev_hash, height)| -> Result<(BlockHash, u32), BridgeError> {
78                let height = u32::try_from(height).wrap_err(BridgeError::IntConversionError)?;
79                Ok((prev_hash.0, height))
80            },
81        )
82        .transpose()
83    }
84
85    /// Gets block hash and height from block id (internal id used in bitcoin_syncer)
86    pub async fn get_block_info_from_id(
87        &self,
88        tx: Option<DatabaseTransaction<'_, '_>>,
89        block_id: u32,
90    ) -> Result<Option<(BlockHash, u32)>, BridgeError> {
91        let query = sqlx::query_as("SELECT blockhash, height FROM bitcoin_syncer WHERE id = $1")
92            .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?);
93
94        let ret: Option<(BlockHashDB, i32)> =
95            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
96
97        ret.map(
98            |(block_hash, height)| -> Result<(BlockHash, u32), BridgeError> {
99                let height = u32::try_from(height).wrap_err(BridgeError::IntConversionError)?;
100                Ok((block_hash.0, height))
101            },
102        )
103        .transpose()
104    }
105
106    /// Stores the full block in bytes in the database, with its height and hash
107    pub async fn upsert_full_block(
108        &self,
109        tx: Option<DatabaseTransaction<'_, '_>>,
110        block: &bitcoin::Block,
111        block_height: u32,
112    ) -> Result<(), BridgeError> {
113        let block_bytes = bitcoin::consensus::serialize(block);
114        let query = sqlx::query(
115            "INSERT INTO bitcoin_blocks (height, block_data, block_hash) VALUES ($1, $2, $3)
116             ON CONFLICT (height) DO UPDATE SET block_data = $2, block_hash = $3",
117        )
118        .bind(i32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?)
119        .bind(&block_bytes)
120        .bind(BlockHashDB(block.header.block_hash()));
121
122        execute_query_with_tx!(self.connection, tx, query, execute)?;
123        Ok(())
124    }
125
126    /// Gets the full block from the database, given the block height
127    pub async fn get_full_block(
128        &self,
129        tx: Option<DatabaseTransaction<'_, '_>>,
130        block_height: u32,
131    ) -> Result<Option<bitcoin::Block>, BridgeError> {
132        let query = sqlx::query_as("SELECT block_data FROM bitcoin_blocks WHERE height = $1")
133            .bind(i32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?);
134
135        let block_data: Option<(Vec<u8>,)> =
136            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
137
138        match block_data {
139            Some((bytes,)) => {
140                let block = bitcoin::consensus::deserialize(&bytes)
141                    .wrap_err(BridgeError::IntConversionError)?;
142                Ok(Some(block))
143            }
144            None => Ok(None),
145        }
146    }
147
148    /// Gets the full block and its height from the database, given the block hash
149    pub async fn get_full_block_from_hash(
150        &self,
151        tx: Option<DatabaseTransaction<'_, '_>>,
152        block_hash: BlockHash,
153    ) -> Result<Option<(u32, bitcoin::Block)>, BridgeError> {
154        let query =
155            sqlx::query_as("SELECT height, block_data FROM bitcoin_blocks WHERE block_hash = $1")
156                .bind(BlockHashDB(block_hash));
157
158        let block_data: Option<(i32, Vec<u8>)> =
159            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
160
161        match block_data {
162            Some((height_i32, bytes)) => {
163                let height = u32::try_from(height_i32).wrap_err(BridgeError::IntConversionError)?;
164                let block = bitcoin::consensus::deserialize(&bytes)
165                    .wrap_err(BridgeError::IntConversionError)?;
166                Ok(Some((height, block)))
167            }
168            None => Ok(None),
169        }
170    }
171
172    /// Gets the maximum height of the canonical blocks in the bitcoin_syncer database
173    pub async fn get_max_height(
174        &self,
175        tx: Option<DatabaseTransaction<'_, '_>>,
176    ) -> Result<Option<u32>, BridgeError> {
177        let query =
178            sqlx::query_as("SELECT height FROM bitcoin_syncer WHERE is_canonical = true ORDER BY height DESC LIMIT 1");
179        let result: Option<(i32,)> =
180            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
181
182        result
183            .map(|(height,)| u32::try_from(height).wrap_err(BridgeError::IntConversionError))
184            .transpose()
185            .map_err(Into::into)
186    }
187
188    /// Gets the block hashes that have height bigger then the given height and deletes them.
189    /// Marks blocks with height bigger than the given height as non-canonical.
190    ///
191    /// # Parameters
192    ///
193    /// - `tx`: Optional transaction to use for the query.
194    /// - `height`: Height to start marking blocks as such (not inclusive).
195    ///
196    /// # Returns
197    ///
198    /// - [`Vec<u32>`]: List of block ids that were marked as non-canonical in
199    ///   descending order.
200    pub async fn update_non_canonical_block_hashes(
201        &self,
202        tx: Option<DatabaseTransaction<'_, '_>>,
203        height: u32,
204    ) -> Result<Vec<u32>, BridgeError> {
205        let query = sqlx::query_as(
206            "WITH deleted AS (
207                UPDATE bitcoin_syncer
208                SET is_canonical = false
209                WHERE height > $1
210                RETURNING id
211            ) SELECT id FROM deleted ORDER BY id DESC",
212        )
213        .bind(i32::try_from(height).wrap_err(BridgeError::IntConversionError)?);
214
215        let block_ids: Vec<(i32,)> = execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
216        block_ids
217            .into_iter()
218            .map(|(block_id,)| u32::try_from(block_id).wrap_err(BridgeError::IntConversionError))
219            .collect::<Result<Vec<_>, eyre::Report>>()
220            .map_err(Into::into)
221    }
222
223    /// Gets the block id of the canonical block at the given height
224    pub async fn get_canonical_block_id_from_height(
225        &self,
226        tx: Option<DatabaseTransaction<'_, '_>>,
227        height: u32,
228    ) -> Result<Option<u32>, BridgeError> {
229        let query = sqlx::query_as(
230            "SELECT id FROM bitcoin_syncer WHERE height = $1 AND is_canonical = true",
231        )
232        .bind(i32::try_from(height).wrap_err(BridgeError::IntConversionError)?);
233
234        let block_id: Option<(i32,)> =
235            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
236
237        block_id
238            .map(|(block_id,)| u32::try_from(block_id).wrap_err(BridgeError::IntConversionError))
239            .transpose()
240            .map_err(Into::into)
241    }
242
243    /// Saves the txid with the id of the block that contains it to the database
244    pub async fn insert_txid_to_block(
245        &self,
246        tx: DatabaseTransaction<'_, '_>,
247        block_id: u32,
248        txid: &bitcoin::Txid,
249    ) -> Result<(), BridgeError> {
250        let query = sqlx::query("INSERT INTO bitcoin_syncer_txs (block_id, txid) VALUES ($1, $2)")
251            .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?)
252            .bind(super::wrapper::TxidDB(*txid));
253
254        execute_query_with_tx!(self.connection, Some(tx), query, execute)?;
255
256        Ok(())
257    }
258
259    /// Gets all the txids that are contained in the block with the given id
260    pub async fn get_block_txids(
261        &self,
262        tx: Option<DatabaseTransaction<'_, '_>>,
263        block_id: u32,
264    ) -> Result<Vec<Txid>, BridgeError> {
265        let query = sqlx::query_as("SELECT txid FROM bitcoin_syncer_txs WHERE block_id = $1")
266            .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?);
267
268        let txids: Vec<(TxidDB,)> = execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
269
270        Ok(txids.into_iter().map(|(txid,)| txid.0).collect())
271    }
272
273    /// Inserts a spent utxo into the database, with the block id that contains it, the spending txid and the vout
274    pub async fn insert_spent_utxo(
275        &self,
276        tx: DatabaseTransaction<'_, '_>,
277        block_id: u32,
278        spending_txid: &bitcoin::Txid,
279        txid: &bitcoin::Txid,
280        vout: i64,
281    ) -> Result<(), BridgeError> {
282        sqlx::query(
283            "INSERT INTO bitcoin_syncer_spent_utxos (block_id, spending_txid, txid, vout) VALUES ($1, $2, $3, $4)",
284        )
285        .bind(block_id as i32)
286        .bind(super::wrapper::TxidDB(*spending_txid))
287        .bind(super::wrapper::TxidDB(*txid))
288        .bind(vout)
289        .execute(tx.deref_mut())
290        .await?;
291        Ok(())
292    }
293
294    /// For a given outpoint, gets the block height of the canonical block that spent it.
295    /// Returns None if the outpoint is not spent.
296    pub async fn get_block_height_of_spending_txid(
297        &self,
298        tx: Option<DatabaseTransaction<'_, '_>>,
299        outpoint: OutPoint,
300    ) -> Result<Option<u32>, BridgeError> {
301        let query = sqlx::query_scalar::<_, i32>(
302            "SELECT bs.height FROM bitcoin_syncer_spent_utxos bspu
303                INNER JOIN bitcoin_syncer bs ON bspu.block_id = bs.id
304                WHERE bspu.txid = $1 AND bspu.vout = $2 AND bs.is_canonical = true",
305        )
306        .bind(super::wrapper::TxidDB(outpoint.txid))
307        .bind(outpoint.vout as i64);
308
309        let result: Option<i32> =
310            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
311
312        result
313            .map(|height| u32::try_from(height).wrap_err(BridgeError::IntConversionError))
314            .transpose()
315            .map_err(Into::into)
316    }
317
318    /// Checks if the utxo is spent, if so checks if the spending tx is finalized
319    /// Returns true if the utxo is spent and the spending tx is finalized, false otherwise
320    pub async fn check_if_utxo_spending_tx_is_finalized(
321        &self,
322        tx: Option<DatabaseTransaction<'_, '_>>,
323        outpoint: OutPoint,
324        current_chain_height: u32,
325        protocol_paramset: &'static ProtocolParamset,
326    ) -> Result<bool, BridgeError> {
327        let spending_tx_height = self.get_block_height_of_spending_txid(tx, outpoint).await?;
328        match spending_tx_height {
329            Some(spending_tx_height) => {
330                Ok(protocol_paramset.is_block_finalized(spending_tx_height, current_chain_height))
331            }
332            None => Ok(false),
333        }
334    }
335
336    /// Gets all the spent utxos for a given txid
337    pub async fn get_spent_utxos_for_txid(
338        &self,
339        tx: Option<DatabaseTransaction<'_, '_>>,
340        txid: Txid,
341    ) -> Result<Vec<(i64, OutPoint)>, BridgeError> {
342        let query = sqlx::query_as(
343            "SELECT block_id, txid, vout FROM bitcoin_syncer_spent_utxos WHERE spending_txid = $1",
344        )
345        .bind(TxidDB(txid));
346
347        let spent_utxos: Vec<(i64, TxidDB, i64)> =
348            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
349
350        spent_utxos
351            .into_iter()
352            .map(
353                |(block_id, txid, vout)| -> Result<(i64, OutPoint), BridgeError> {
354                    let vout = u32::try_from(vout).wrap_err(BridgeError::IntConversionError)?;
355                    Ok((block_id, OutPoint { txid: txid.0, vout }))
356                },
357            )
358            .collect::<Result<Vec<_>, BridgeError>>()
359    }
360
361    /// Adds a bitcoin syncer event to the database. These events can currently be new block or reorged block.
362    pub async fn insert_event(
363        &self,
364        tx: Option<DatabaseTransaction<'_, '_>>,
365        event_type: BitcoinSyncerEvent,
366    ) -> Result<(), BridgeError> {
367        let query = match event_type {
368            BitcoinSyncerEvent::NewBlock(block_id) => sqlx::query(
369                "INSERT INTO bitcoin_syncer_events (block_id, event_type) VALUES ($1, 'new_block'::bitcoin_syncer_event_type)",
370            )
371            .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?),
372            BitcoinSyncerEvent::ReorgedBlock(block_id) => sqlx::query(
373                "INSERT INTO bitcoin_syncer_events (block_id, event_type) VALUES ($1, 'reorged_block'::bitcoin_syncer_event_type)",
374            )
375            .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?),
376        };
377        execute_query_with_tx!(self.connection, tx, query, execute)?;
378        Ok(())
379    }
380
381    /// Returns the last processed Bitcoin Syncer event's block height for given consumer.
382    /// If the last processed event is missing, i.e. there are no processed events for the consumer, returns `None`.
383    pub async fn get_last_processed_event_block_height(
384        &self,
385        tx: Option<DatabaseTransaction<'_, '_>>,
386        consumer_handle: &str,
387    ) -> Result<Option<u32>, BridgeError> {
388        let query = sqlx::query_scalar::<_, i32>(
389            r#"SELECT bs.height
390             FROM bitcoin_syncer_event_handlers bseh
391             INNER JOIN bitcoin_syncer_events bse ON bseh.last_processed_event_id = bse.id
392             INNER JOIN bitcoin_syncer bs ON bse.block_id = bs.id
393             WHERE bseh.consumer_handle = $1"#,
394        )
395        .bind(consumer_handle);
396
397        let result: Option<i32> =
398            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
399
400        result
401            .map(|h| {
402                u32::try_from(h)
403                    .wrap_err(BridgeError::IntConversionError)
404                    .map_err(BridgeError::from)
405            })
406            .transpose()
407    }
408
409    /// Gets the last processed event id for a given consumer
410    pub async fn get_last_processed_event_id(
411        &self,
412        tx: DatabaseTransaction<'_, '_>,
413        consumer_handle: &str,
414    ) -> Result<i32, BridgeError> {
415        // Step 1: Insert the consumer_handle if it doesn't exist
416        sqlx::query(
417            r#"
418            INSERT INTO bitcoin_syncer_event_handlers (consumer_handle, last_processed_event_id)
419            VALUES ($1, 0)
420            ON CONFLICT (consumer_handle) DO NOTHING
421            "#,
422        )
423        .bind(consumer_handle)
424        .execute(tx.deref_mut())
425        .await?;
426
427        // Step 2: Get the last processed event ID for this consumer
428        let last_processed_event_id: i32 = sqlx::query_scalar(
429            r#"
430            SELECT last_processed_event_id
431            FROM bitcoin_syncer_event_handlers
432            WHERE consumer_handle = $1
433            "#,
434        )
435        .bind(consumer_handle)
436        .fetch_one(tx.deref_mut())
437        .await?;
438
439        Ok(last_processed_event_id)
440    }
441
442    /// Returns the maximum block height of the blocks that have been processed by the given consumer.
443    /// If the last processed event is missing, i.e. there are no processed events for the consumer, returns `None`.
444    pub async fn get_max_processed_block_height(
445        &self,
446        tx: Option<DatabaseTransaction<'_, '_>>,
447        consumer_handle: &str,
448    ) -> Result<Option<u32>, BridgeError> {
449        let query = sqlx::query_scalar::<_, Option<i32>>(
450            r#"SELECT MAX(bs.height)
451             FROM bitcoin_syncer_events bse
452             INNER JOIN bitcoin_syncer bs ON bse.block_id = bs.id
453             WHERE bse.id <= (
454                 SELECT last_processed_event_id
455                 FROM bitcoin_syncer_event_handlers
456                 WHERE consumer_handle = $1
457             )"#,
458        )
459        .bind(consumer_handle);
460
461        let result: Option<i32> = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
462
463        result
464            .map(|h| {
465                u32::try_from(h)
466                    .wrap_err(BridgeError::IntConversionError)
467                    .map_err(BridgeError::from)
468            })
469            .transpose()
470    }
471
472    /// Returns the next finalized block height that should be processed by the given consumer.
473    /// If there are no processed events, returns the paramset start height.
474    /// Next height is the max height of the processed block - finality depth + 1.
475    pub async fn get_next_finalized_block_height_for_consumer(
476        &self,
477        tx: Option<DatabaseTransaction<'_, '_>>,
478        consumer_handle: &str,
479        paramset: &'static ProtocolParamset,
480    ) -> Result<u32, BridgeError> {
481        let max_processed_block_height = self
482            .get_max_processed_block_height(tx, consumer_handle)
483            .await?;
484
485        let max_processed_finalized_block_height = match max_processed_block_height {
486            Some(max_processed_block_height) => {
487                max_processed_block_height.checked_sub(paramset.finality_depth - 1)
488            }
489            None => None,
490        };
491
492        let next_height = max_processed_finalized_block_height
493            .map(|h| h + 1)
494            .unwrap_or(paramset.start_height);
495
496        Ok(std::cmp::max(next_height, paramset.start_height))
497    }
498
499    /// Fetches the next bitcoin syncer event for a given consumer
500    /// This function is used to fetch the next event that hasn't been processed yet
501    /// It will return the event which includes the event type and the block id
502    /// The last updated event id is also updated to the id that is returned
503    /// If there are no more events to fetch, None is returned
504    pub async fn fetch_next_bitcoin_syncer_evt(
505        &self,
506        tx: DatabaseTransaction<'_, '_>,
507        consumer_handle: &str,
508    ) -> Result<Option<BitcoinSyncerEvent>, BridgeError> {
509        // Get the last processed event ID for this consumer
510        let last_processed_event_id = self
511            .get_last_processed_event_id(tx, consumer_handle)
512            .await?;
513
514        // Retrieve the next event that hasn't been processed yet
515        let event = sqlx::query_as::<_, (i32, i32, String)>(
516            r#"
517            SELECT id, block_id, event_type::text
518            FROM bitcoin_syncer_events
519            WHERE id > $1
520            ORDER BY id ASC
521            LIMIT 1
522            "#,
523        )
524        .bind(last_processed_event_id)
525        .fetch_optional(tx.deref_mut())
526        .await?;
527
528        if event.is_none() {
529            return Ok(None);
530        }
531
532        let event = event.expect("should exist since we checked is_none()");
533        let event_id = event.0;
534        let event_type: BitcoinSyncerEvent = (event.2, event.1).try_into()?;
535
536        // Update last_processed_event_id for this consumer
537        sqlx::query(
538            r#"
539            UPDATE bitcoin_syncer_event_handlers
540            SET last_processed_event_id = $1
541            WHERE consumer_handle = $2
542            "#,
543        )
544        .bind(event_id)
545        .bind(consumer_handle)
546        .execute(tx.deref_mut())
547        .await?;
548
549        Ok(Some(event_type))
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556    use crate::database::Database;
557    use crate::test::common::*;
558    use bitcoin::hashes::Hash;
559    use bitcoin::{BlockHash, CompactTarget};
560
561    async fn setup_test_db() -> Database {
562        let config = create_test_config_with_thread_name().await;
563        Database::new(&config).await.unwrap()
564    }
565
566    #[tokio::test]
567    async fn test_event_handling() {
568        let db = setup_test_db().await;
569        let mut dbtx = db.begin_transaction().await.unwrap();
570
571        // Create a test block
572        let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
573        let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
574        let height = 0x45;
575
576        let block_id = db
577            .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
578            .await
579            .unwrap();
580
581        // Add new block event
582        db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
583            .await
584            .unwrap();
585
586        // Test event consumption
587        let consumer_handle = "test_consumer";
588        let event = db
589            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer_handle)
590            .await
591            .unwrap();
592
593        assert!(matches!(event, Some(BitcoinSyncerEvent::NewBlock(id)) if id == block_id));
594
595        // Test that the same event is not returned twice
596        let event = db
597            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer_handle)
598            .await
599            .unwrap();
600        assert!(event.is_none());
601
602        // Add reorg event
603        db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::ReorgedBlock(block_id))
604            .await
605            .unwrap();
606
607        // Test that new event is received
608        let event = db
609            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer_handle)
610            .await
611            .unwrap();
612        assert!(matches!(event, Some(BitcoinSyncerEvent::ReorgedBlock(id)) if id == block_id));
613
614        dbtx.commit().await.unwrap();
615    }
616
617    #[tokio::test]
618    async fn test_store_and_get_block() {
619        let db = setup_test_db().await;
620        let block_height = 123u32;
621
622        // Create a dummy block
623        let dummy_header = bitcoin::block::Header {
624            version: bitcoin::block::Version::TWO,
625            prev_blockhash: BlockHash::from_raw_hash(Hash::from_byte_array([0x42; 32])),
626            merkle_root: bitcoin::TxMerkleNode::all_zeros(),
627            time: 1_000_000,
628            bits: CompactTarget::from_consensus(0),
629            nonce: 12345,
630        };
631
632        let dummy_txs = vec![bitcoin::Transaction {
633            version: bitcoin::blockdata::transaction::Version::TWO,
634            lock_time: bitcoin::absolute::LockTime::ZERO,
635            input: vec![],
636            output: vec![],
637        }];
638
639        let dummy_block = bitcoin::Block {
640            header: dummy_header,
641            txdata: dummy_txs.clone(),
642        };
643
644        let dummy_block_hash = dummy_block.block_hash();
645
646        // Store the block
647        db.upsert_full_block(None, &dummy_block, block_height)
648            .await
649            .unwrap();
650
651        // Retrieve the block
652        let retrieved_block = db
653            .get_full_block(None, block_height)
654            .await
655            .unwrap()
656            .unwrap();
657
658        // Verify block fields match
659        assert_eq!(retrieved_block, dummy_block);
660
661        // Retrieve the block
662        let retrieved_block_from_hash = db
663            .get_full_block_from_hash(None, dummy_block_hash)
664            .await
665            .unwrap()
666            .unwrap()
667            .1;
668
669        // Verify block fields match
670        assert_eq!(retrieved_block_from_hash, dummy_block);
671
672        // Non-existent block should return None
673        assert!(db.get_full_block(None, 999).await.unwrap().is_none());
674
675        // Overwrite the block
676        let updated_dummy_header = bitcoin::block::Header {
677            version: bitcoin::block::Version::ONE, // Changed version
678            ..dummy_header
679        };
680        let updated_dummy_block = bitcoin::Block {
681            header: updated_dummy_header,
682            txdata: dummy_txs.clone(),
683        };
684
685        let updated_dummy_block_hash = updated_dummy_block.block_hash();
686
687        db.upsert_full_block(None, &updated_dummy_block, block_height)
688            .await
689            .unwrap();
690
691        // Verify the update worked
692        let retrieved_updated_block = db
693            .get_full_block(None, block_height)
694            .await
695            .unwrap()
696            .unwrap();
697        assert_eq!(updated_dummy_block, retrieved_updated_block);
698
699        let retrieved_updated_block_from_hash = db
700            .get_full_block_from_hash(None, updated_dummy_block_hash)
701            .await
702            .unwrap()
703            .unwrap()
704            .1;
705        assert_eq!(updated_dummy_block, retrieved_updated_block_from_hash);
706    }
707
708    #[tokio::test]
709    async fn test_multiple_event_consumers() {
710        let db = setup_test_db().await;
711        let mut dbtx = db.begin_transaction().await.unwrap();
712
713        // Create a test block
714        let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
715        let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
716        let height = 0x45;
717
718        let block_id = db
719            .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
720            .await
721            .unwrap();
722
723        // Add events
724        db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
725            .await
726            .unwrap();
727        db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::ReorgedBlock(block_id))
728            .await
729            .unwrap();
730
731        // Test with multiple consumers
732        let consumer1 = "consumer1";
733        let consumer2 = "consumer2";
734
735        // First consumer gets both events in order
736        let event1 = db
737            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer1)
738            .await
739            .unwrap();
740        assert!(matches!(event1, Some(BitcoinSyncerEvent::NewBlock(id)) if id == block_id));
741
742        let event2 = db
743            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer1)
744            .await
745            .unwrap();
746        assert!(matches!(event2, Some(BitcoinSyncerEvent::ReorgedBlock(id)) if id == block_id));
747
748        // Second consumer also gets both events independently
749        let event1 = db
750            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer2)
751            .await
752            .unwrap();
753        assert!(matches!(event1, Some(BitcoinSyncerEvent::NewBlock(id)) if id == block_id));
754
755        let event2 = db
756            .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer2)
757            .await
758            .unwrap();
759        assert!(matches!(event2, Some(BitcoinSyncerEvent::ReorgedBlock(id)) if id == block_id));
760
761        dbtx.commit().await.unwrap();
762    }
763
764    #[tokio::test]
765    async fn test_non_canonical_blocks() {
766        let db = setup_test_db().await;
767        let mut dbtx = db.begin_transaction().await.unwrap();
768
769        // Create a chain of blocks
770        let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
771        let heights = [1, 2, 3, 4, 5];
772        let mut last_hash = prev_block_hash;
773
774        // Save some initial blocks.
775        let mut block_ids = Vec::new();
776        for height in heights {
777            let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([height as u8; 32]));
778            let block_id = db
779                .insert_block_info(Some(&mut dbtx), &block_hash, &last_hash, height)
780                .await
781                .unwrap();
782            block_ids.push(block_id);
783            last_hash = block_hash;
784        }
785
786        // Mark blocks above height 2 as non-canonical.
787        let non_canonical_blocks = db
788            .update_non_canonical_block_hashes(Some(&mut dbtx), 2)
789            .await
790            .unwrap();
791        assert_eq!(non_canonical_blocks.len(), 3);
792        assert_eq!(non_canonical_blocks, vec![5, 4, 3]);
793
794        // Verify blocks above height 2 are not returned
795        for height in heights {
796            let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([height as u8; 32]));
797            let block_info = db
798                .get_block_info_from_hash(Some(&mut dbtx), block_hash)
799                .await
800                .unwrap();
801
802            if height <= 2 {
803                assert!(block_info.is_some());
804            } else {
805                assert!(block_info.is_none());
806            }
807        }
808
809        // Verify max height is now 2
810        let max_height = db.get_max_height(Some(&mut dbtx)).await.unwrap().unwrap();
811        assert_eq!(max_height, 2);
812
813        dbtx.commit().await.unwrap();
814    }
815
816    #[tokio::test]
817    async fn add_get_block_info() {
818        let config = create_test_config_with_thread_name().await;
819        let db = Database::new(&config).await.unwrap();
820
821        let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
822        let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
823        let height = 0x45;
824
825        assert!(db
826            .get_block_info_from_hash(None, block_hash)
827            .await
828            .unwrap()
829            .is_none());
830
831        db.insert_block_info(None, &block_hash, &prev_block_hash, height)
832            .await
833            .unwrap();
834        let block_info = db
835            .get_block_info_from_hash(None, block_hash)
836            .await
837            .unwrap()
838            .unwrap();
839        let max_height = db.get_max_height(None).await.unwrap().unwrap();
840        assert_eq!(block_info.0, prev_block_hash);
841        assert_eq!(block_info.1, height);
842        assert_eq!(max_height, height);
843
844        db.insert_block_info(
845            None,
846            &BlockHash::from_raw_hash(Hash::from_byte_array([0x1; 32])),
847            &prev_block_hash,
848            height - 1,
849        )
850        .await
851        .unwrap();
852        let max_height = db.get_max_height(None).await.unwrap().unwrap();
853        assert_eq!(max_height, height);
854
855        db.insert_block_info(
856            None,
857            &BlockHash::from_raw_hash(Hash::from_byte_array([0x2; 32])),
858            &prev_block_hash,
859            height + 1,
860        )
861        .await
862        .unwrap();
863        let max_height = db.get_max_height(None).await.unwrap().unwrap();
864        assert_ne!(max_height, height);
865        assert_eq!(max_height, height + 1);
866    }
867
868    #[tokio::test]
869    async fn add_and_get_txids_from_block() {
870        let config = create_test_config_with_thread_name().await;
871        let db = Database::new(&config).await.unwrap();
872        let mut dbtx = db.begin_transaction().await.unwrap();
873
874        assert!(db
875            .insert_txid_to_block(&mut dbtx, 0, &Txid::all_zeros())
876            .await
877            .is_err());
878        let mut dbtx = db.begin_transaction().await.unwrap();
879
880        let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
881        let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
882        let height = 0x45;
883        let block_id = db
884            .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
885            .await
886            .unwrap();
887
888        let txids = vec![
889            Txid::from_raw_hash(Hash::from_byte_array([0x1; 32])),
890            Txid::from_raw_hash(Hash::from_byte_array([0x2; 32])),
891            Txid::from_raw_hash(Hash::from_byte_array([0x3; 32])),
892        ];
893        for txid in &txids {
894            db.insert_txid_to_block(&mut dbtx, block_id, txid)
895                .await
896                .unwrap();
897        }
898
899        let txids_from_db = db.get_block_txids(Some(&mut dbtx), block_id).await.unwrap();
900        assert_eq!(txids_from_db, txids);
901
902        assert!(db
903            .get_block_txids(Some(&mut dbtx), block_id + 1)
904            .await
905            .unwrap()
906            .is_empty());
907
908        dbtx.commit().await.unwrap();
909    }
910
911    #[tokio::test]
912    async fn insert_get_spent_utxos() {
913        let config = create_test_config_with_thread_name().await;
914        let db = Database::new(&config).await.unwrap();
915        let mut dbtx = db.begin_transaction().await.unwrap();
916
917        let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
918        let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
919        let height = 0x45;
920        let block_id = db
921            .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
922            .await
923            .unwrap();
924
925        let spending_txid = Txid::from_raw_hash(Hash::from_byte_array([0x2; 32]));
926        let txid = Txid::from_raw_hash(Hash::from_byte_array([0x1; 32]));
927        let vout = 0;
928        db.insert_txid_to_block(&mut dbtx, block_id, &spending_txid)
929            .await
930            .unwrap();
931
932        assert_eq!(
933            db.get_spent_utxos_for_txid(Some(&mut dbtx), txid)
934                .await
935                .unwrap()
936                .len(),
937            0
938        );
939
940        db.insert_spent_utxo(&mut dbtx, block_id, &spending_txid, &txid, vout)
941            .await
942            .unwrap();
943
944        let spent_utxos = db
945            .get_spent_utxos_for_txid(Some(&mut dbtx), spending_txid)
946            .await
947            .unwrap();
948        assert_eq!(spent_utxos.len(), 1);
949        assert_eq!(spent_utxos[0].0, block_id as i64);
950        assert_eq!(
951            spent_utxos[0].1,
952            bitcoin::OutPoint {
953                txid,
954                vout: vout as u32,
955            }
956        );
957
958        dbtx.commit().await.unwrap();
959    }
960}