clementine_core/
bitcoin_syncer.rs

1//! # Bitcoin Syncer
2//!
3//! This module provides common utilities to fetch Bitcoin state. Other modules
4//! can use this module to operate over Bitcoin. Every block starting from
5//! `paramset.start_height` is fetched and stored in the database.
6
7use crate::{
8    config::protocol::ProtocolParamset,
9    database::{Database, DatabaseTransaction},
10    extended_bitcoin_rpc::ExtendedBitcoinRpc,
11    task::{IntoTask, RecoverableTask, Task, TaskExt, TaskVariant, WithDelay},
12};
13use bitcoin::{block::Header, BlockHash, OutPoint};
14use bitcoincore_rpc::RpcApi;
15use clementine_errors::BridgeError;
16use eyre::Context;
17use std::time::Duration;
18use tonic::async_trait;
19
20pub const BTC_SYNCER_POLL_DELAY: Duration = if cfg!(test) {
21    Duration::from_millis(250)
22} else {
23    Duration::from_secs(30)
24};
25
26/// Represents basic information of a Bitcoin block.
27#[derive(Clone, Debug)]
28struct BlockInfo {
29    hash: BlockHash,
30    _header: Header,
31    height: u32,
32}
33
34pub use clementine_primitives::BitcoinSyncerEvent;
35
36/// Trait for handling new blocks as they are finalized
37#[async_trait]
38pub trait BlockHandler: Send + Sync + 'static {
39    /// Handle a new finalized block
40    async fn handle_new_block(
41        &mut self,
42        dbtx: DatabaseTransaction<'_>,
43        block_id: u32,
44        block: bitcoin::Block,
45        height: u32,
46    ) -> Result<(), BridgeError>;
47}
48
49/// Fetches the [`BlockInfo`] for a given height from Bitcoin.
50async fn fetch_block_info_from_height(
51    rpc: &ExtendedBitcoinRpc,
52    height: u32,
53) -> Result<BlockInfo, BridgeError> {
54    let hash = rpc
55        .get_block_hash(height as u64)
56        .await
57        .wrap_err("Failed to get block hash")?;
58    let header = rpc
59        .get_block_header(&hash)
60        .await
61        .wrap_err("Failed to get block header")?;
62
63    Ok(BlockInfo {
64        hash,
65        _header: header,
66        height,
67    })
68}
69
70/// Saves a Bitcoin block's metadata and it's transactions into the database.
71pub(crate) async fn save_block(
72    db: &Database,
73    dbtx: DatabaseTransaction<'_>,
74    block: &bitcoin::Block,
75    block_height: u32,
76) -> Result<u32, BridgeError> {
77    let block_hash = block.block_hash();
78    tracing::debug!(
79        "Saving a block with hash of {} and height of {}",
80        block_hash,
81        block_height
82    );
83
84    // update the block_info as canonical if it already exists
85    let block_id = db.update_block_as_canonical(Some(dbtx), block_hash).await?;
86    db.upsert_full_block(Some(dbtx), block, block_height)
87        .await?;
88
89    if let Some(block_id) = block_id {
90        return Ok(block_id);
91    }
92
93    let block_id = db
94        .insert_block_info(
95            Some(dbtx),
96            &block_hash,
97            &block.header.prev_blockhash,
98            block_height,
99        )
100        .await?;
101
102    tracing::debug!(
103        "Saving {} transactions to a block with hash {}",
104        block.txdata.len(),
105        block_hash
106    );
107    for tx in &block.txdata {
108        save_transaction_spent_utxos(db, dbtx, tx, block_id).await?;
109    }
110
111    Ok(block_id)
112}
113async fn _get_block_info_from_hash(
114    db: &Database,
115    dbtx: DatabaseTransaction<'_>,
116    rpc: &ExtendedBitcoinRpc,
117    hash: BlockHash,
118) -> Result<(BlockInfo, Vec<Vec<OutPoint>>), BridgeError> {
119    let block = rpc.get_block(&hash).await.wrap_err("Failed to get block")?;
120    let block_height = db
121        .get_block_info_from_hash(Some(dbtx), hash)
122        .await?
123        .ok_or_else(|| eyre::eyre!("Block not found in get_block_info_from_hash"))?
124        .1;
125
126    let mut block_utxos: Vec<Vec<OutPoint>> = Vec::new();
127    for tx in &block.txdata {
128        let txid = tx.compute_txid();
129        let spent_utxos = _get_transaction_spent_utxos(db, dbtx, txid).await?;
130        block_utxos.push(spent_utxos);
131    }
132
133    let block_info = BlockInfo {
134        hash,
135        _header: block.header,
136        height: block_height,
137    };
138
139    Ok((block_info, block_utxos))
140}
141
142/// Saves a Bitcoin transaction and its spent UTXOs to the database.
143async fn save_transaction_spent_utxos(
144    db: &Database,
145    dbtx: DatabaseTransaction<'_>,
146    tx: &bitcoin::Transaction,
147    block_id: u32,
148) -> Result<(), BridgeError> {
149    let txid = tx.compute_txid();
150    db.insert_txid_to_block(dbtx, block_id, &txid).await?;
151
152    for input in &tx.input {
153        db.insert_spent_utxo(
154            dbtx,
155            block_id,
156            &txid,
157            &input.previous_output.txid,
158            input.previous_output.vout as i64,
159        )
160        .await?;
161    }
162
163    Ok(())
164}
165async fn _get_transaction_spent_utxos(
166    db: &Database,
167    dbtx: DatabaseTransaction<'_>,
168    txid: bitcoin::Txid,
169) -> Result<Vec<OutPoint>, BridgeError> {
170    let utxos = db.get_spent_utxos_for_txid(Some(dbtx), txid).await?;
171    let utxos = utxos.into_iter().map(|utxo| utxo.1).collect::<Vec<_>>();
172
173    Ok(utxos)
174}
175
176/// If no block info exists in the DB, fetches the current block from the RPC and initializes the DB.
177pub async fn set_initial_block_info_if_not_exists(
178    db: &Database,
179    rpc: &ExtendedBitcoinRpc,
180    paramset: &'static ProtocolParamset,
181) -> Result<(), BridgeError> {
182    if db.get_max_height(None).await?.is_some() {
183        return Ok(());
184    }
185
186    let current_height = u32::try_from(
187        rpc.get_block_count()
188            .await
189            .wrap_err("Failed to get block count")?,
190    )
191    .wrap_err(BridgeError::IntConversionError)?;
192
193    if paramset.start_height > current_height {
194        tracing::error!(
195            "Bitcoin syncer could not find enough available blocks in chain (Likely a regtest problem). start_height ({}) > current_height ({})",
196            paramset.start_height,
197            current_height
198        );
199        return Ok(());
200    }
201
202    let height = paramset.start_height;
203    let mut dbtx = db.begin_transaction().await?;
204    // first collect previous needed blocks according to paramset start height
205    let block_info = fetch_block_info_from_height(rpc, height).await?;
206    let block = rpc
207        .get_block(&block_info.hash)
208        .await
209        .wrap_err("Failed to get block")?;
210    let block_id = save_block(db, &mut dbtx, &block, height).await?;
211    db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
212        .await?;
213
214    dbtx.commit().await?;
215
216    Ok(())
217}
218
219/// Fetches the next block from Bitcoin, if it exists. Will also fetch previous
220/// blocks if the parent is missing, and give an error if the number of reorged blocks is greater than the finality depth.
221///
222/// # Parameters
223///
224/// - `current_height`: The height of the current tip **in the database**.
225///
226/// # Returns
227///
228/// - [`None`] - If no new block is available.
229/// - [`Vec<BlockInfo>`] - If new blocks are found.
230/// - [`BridgeError`] - If the number of reorged blocks is greater than the finality depth or db/rpc errors.
231async fn fetch_new_blocks(
232    db: &Database,
233    rpc: &ExtendedBitcoinRpc,
234    current_height: u32,
235    finality_depth: u32,
236) -> Result<Option<Vec<BlockInfo>>, BridgeError> {
237    let next_height = current_height + 1;
238
239    // Try to fetch the block hash for the next height.
240    let block_hash = match rpc.get_block_hash(next_height as u64).await {
241        Ok(hash) => hash,
242        Err(_) => return Ok(None),
243    };
244    tracing::debug!(
245        "Fetching block with hash of {:?} and height of {}...",
246        block_hash,
247        next_height
248    );
249
250    // Fetch its header.
251    let mut block_header = rpc
252        .get_block_header(&block_hash)
253        .await
254        .wrap_err("Failed to get block header")?;
255    let mut new_blocks = vec![BlockInfo {
256        hash: block_hash,
257        _header: block_header,
258        height: next_height,
259    }];
260
261    // Walk backwards until the parent is found in the database.
262    while db
263        .get_block_info_from_hash(None, block_header.prev_blockhash)
264        .await?
265        .is_none()
266    {
267        let prev_block_hash = block_header.prev_blockhash;
268        block_header = rpc
269            .get_block_header(&prev_block_hash)
270            .await
271            .wrap_err("Failed to get block header")?;
272        let new_height = new_blocks.last().expect("new_blocks is empty").height - 1;
273        new_blocks.push(BlockInfo {
274            hash: prev_block_hash,
275            _header: block_header,
276            height: new_height,
277        });
278
279        // new blocks includes the new one (block with height our previous tip + 1),
280        // so new_blocks.len() = 5 -> 4 reorged blocks,
281        if new_blocks.len() as u32 > finality_depth {
282            return Err(eyre::eyre!(
283                "Number of reorged blocks {} is greater than finality depth {}, reorged blocks: {:?}. If true, increase finality depth and resync the chain",
284                new_blocks.len() - 1,
285                finality_depth,
286                new_blocks
287            )
288            .into());
289        }
290    }
291
292    // The chain was built from tip to fork; reverse it to be in ascending order.
293    new_blocks.reverse();
294
295    Ok(Some(new_blocks))
296}
297
298/// Marks blocks above the common ancestor as non-canonical and emits reorg events.
299#[tracing::instrument(skip(db, dbtx), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
300async fn handle_reorg_events(
301    db: &Database,
302    dbtx: DatabaseTransaction<'_>,
303    common_ancestor_height: u32,
304) -> Result<(), BridgeError> {
305    let reorg_blocks = db
306        .update_non_canonical_block_hashes(Some(dbtx), common_ancestor_height)
307        .await?;
308    if !reorg_blocks.is_empty() {
309        tracing::debug!("Reorg occurred! Block ids: {:?}", reorg_blocks);
310    }
311
312    for reorg_block_id in reorg_blocks {
313        db.insert_event(Some(dbtx), BitcoinSyncerEvent::ReorgedBlock(reorg_block_id))
314            .await?;
315    }
316
317    Ok(())
318}
319
320/// Processes and inserts new blocks into the database, emitting a new block event for each.
321async fn process_new_blocks(
322    db: &Database,
323    rpc: &ExtendedBitcoinRpc,
324    dbtx: DatabaseTransaction<'_>,
325    new_blocks: &[BlockInfo],
326) -> Result<(), BridgeError> {
327    for block_info in new_blocks {
328        let block = rpc
329            .get_block(&block_info.hash)
330            .await
331            .wrap_err("Failed to get block")?;
332
333        let block_id = save_block(db, dbtx, &block, block_info.height).await?;
334        db.insert_event(Some(dbtx), BitcoinSyncerEvent::NewBlock(block_id))
335            .await?;
336    }
337
338    Ok(())
339}
340
341/// A task that syncs Bitcoin blocks from the Bitcoin node to the local database.
342#[derive(Debug)]
343pub struct BitcoinSyncerTask {
344    /// The database to store blocks in
345    db: Database,
346    /// The RPC client to fetch blocks from
347    rpc: ExtendedBitcoinRpc,
348    /// The current block height that has been synced
349    current_height: u32,
350    /// The finality depth
351    finality_depth: u32,
352}
353
354#[derive(Debug)]
355pub struct BitcoinSyncer {
356    /// The database to store blocks in
357    db: Database,
358    /// The RPC client to fetch blocks from
359    rpc: ExtendedBitcoinRpc,
360    /// The current block height that has been synced
361    current_height: u32,
362    /// The finality depth
363    finality_depth: u32,
364}
365
366impl BitcoinSyncer {
367    /// Creates a new Bitcoin syncer task.
368    ///
369    /// This function initializes the database with the first block if it's empty.
370    pub async fn new(
371        db: Database,
372        rpc: ExtendedBitcoinRpc,
373        paramset: &'static ProtocolParamset,
374    ) -> Result<Self, BridgeError> {
375        // Initialize the database if needed
376        set_initial_block_info_if_not_exists(&db, &rpc, paramset).await?;
377
378        // Get the current height from the database
379        let current_height = db
380            .get_max_height(None)
381            .await?
382            .ok_or_else(|| eyre::eyre!("Block not found in BitcoinSyncer::new"))?;
383
384        Ok(Self {
385            db,
386            rpc,
387            current_height,
388            finality_depth: paramset.finality_depth,
389        })
390    }
391}
392impl IntoTask for BitcoinSyncer {
393    type Task = WithDelay<BitcoinSyncerTask>;
394
395    fn into_task(self) -> Self::Task {
396        BitcoinSyncerTask {
397            db: self.db,
398            rpc: self.rpc,
399            current_height: self.current_height,
400            finality_depth: self.finality_depth,
401        }
402        .with_delay(BTC_SYNCER_POLL_DELAY)
403    }
404}
405
406#[async_trait]
407impl Task for BitcoinSyncerTask {
408    type Output = bool;
409    const VARIANT: TaskVariant = TaskVariant::BitcoinSyncer;
410
411    #[tracing::instrument(skip(self))]
412    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
413        let new_blocks = match fetch_new_blocks(
414            &self.db,
415            &self.rpc,
416            self.current_height,
417            self.finality_depth,
418        )
419        .await?
420        {
421            Some(blocks) if !blocks.is_empty() => {
422                tracing::debug!(
423                    "{} new blocks found after current height {}",
424                    blocks.len(),
425                    self.current_height
426                );
427
428                blocks
429            }
430            _ => {
431                tracing::debug!(
432                    "No new blocks found after current height: {}",
433                    self.current_height
434                );
435
436                return Ok(false);
437            }
438        };
439
440        // The common ancestor is the block preceding the first new block.
441        // Please note that this won't always be the `self.current_height`.
442        // Because `fetch_next_block` can fetch older blocks, if db is missing
443        // them.
444        let common_ancestor_height = new_blocks[0].height - 1;
445        tracing::debug!("Common ancestor height: {:?}", common_ancestor_height);
446        let mut dbtx = self.db.begin_transaction().await?;
447
448        // Mark reorg blocks (if any) as non-canonical.
449        handle_reorg_events(&self.db, &mut dbtx, common_ancestor_height).await?;
450        tracing::debug!("BitcoinSyncer: Marked reorg blocks as non-canonical");
451
452        // Process and insert the new blocks.
453        tracing::debug!("BitcoinSyncer: Processing new blocks");
454        tracing::debug!("BitcoinSyncer: New blocks: {:?}", new_blocks.len());
455        process_new_blocks(&self.db, &self.rpc, &mut dbtx, &new_blocks).await?;
456
457        dbtx.commit().await?;
458
459        // Update the current height to the tip of the new chain.
460        tracing::debug!("BitcoinSyncer: Updating current height");
461        self.current_height = new_blocks.last().expect("new_blocks is not empty").height;
462        tracing::debug!("BitcoinSyncer: Current height: {:?}", self.current_height);
463
464        // Return true to indicate work was done
465        Ok(true)
466    }
467}
468
469#[derive(Debug)]
470pub struct FinalizedBlockFetcherTask<H: BlockHandler> {
471    db: Database,
472    btc_syncer_consumer_id: String,
473    paramset: &'static ProtocolParamset,
474    next_finalized_height: u32,
475    handler: H,
476}
477
478impl<H: BlockHandler> FinalizedBlockFetcherTask<H> {
479    pub fn new(
480        db: Database,
481        btc_syncer_consumer_id: String,
482        paramset: &'static ProtocolParamset,
483        next_finalized_height: u32,
484        handler: H,
485    ) -> Self {
486        tracing::debug!(
487            "Creating finalized block fetcher with consumer id: {}",
488            btc_syncer_consumer_id
489        );
490        Self {
491            db,
492            btc_syncer_consumer_id,
493            paramset,
494            next_finalized_height,
495            handler,
496        }
497    }
498}
499
500#[async_trait]
501impl<H: BlockHandler> Task for FinalizedBlockFetcherTask<H> {
502    type Output = bool;
503    const VARIANT: TaskVariant = TaskVariant::FinalizedBlockFetcher;
504
505    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
506        let mut dbtx = self.db.begin_transaction().await?;
507
508        // Poll for the next bitcoin syncer event
509        let Some(event) = self
510            .db
511            .fetch_next_bitcoin_syncer_evt(&mut dbtx, &self.btc_syncer_consumer_id)
512            .await?
513        else {
514            // No event found, we can safely commit the transaction and return
515            dbtx.commit().await?;
516            return Ok(false);
517        };
518        let mut expected_next_finalized = self.next_finalized_height;
519
520        // Process the event
521        match event {
522            BitcoinSyncerEvent::NewBlock(block_id) => {
523                let new_block_height = self
524                    .db
525                    .get_block_info_from_id(Some(&mut dbtx), block_id)
526                    .await?
527                    .ok_or(eyre::eyre!("Block not found in BlockFetcherTask",))?
528                    .1;
529
530                // whether there's a new finalized block, can be false if reorged to the same height
531                let mut new_tip = false;
532                let mut warned = false;
533
534                // Update states to catch up to finalized chain
535                while self
536                    .paramset
537                    .is_block_finalized(expected_next_finalized, new_block_height)
538                {
539                    if new_tip && !warned {
540                        warned = true;
541                        // this event is multiple blocks away, report
542                        tracing::warn!("Received event with multiple finalized blocks, expected 1 for ordered events. Got a new block with height {new_block_height}, expected next finalized block {}", self.next_finalized_height);
543                    }
544                    new_tip = true;
545
546                    let block = self
547                        .db
548                        .get_full_block(Some(&mut dbtx), expected_next_finalized)
549                        .await?
550                        .ok_or(eyre::eyre!(
551                            "Block at height {} not found in BlockFetcherTask, current tip height is {}",
552                            expected_next_finalized, new_block_height
553                        ))?;
554
555                    let new_block_id = self
556                        .db
557                        .get_canonical_block_id_from_height(
558                            Some(&mut dbtx),
559                            expected_next_finalized,
560                        )
561                        .await?;
562
563                    let Some(new_block_id) = new_block_id else {
564                        tracing::error!("Block at height {} not found in BlockFetcherTask, current tip height is {}", expected_next_finalized, new_block_height);
565                        return Err(eyre::eyre!(
566                            "Block at height {} not found in BlockFetcherTask, current tip height is {}",
567                            expected_next_finalized, new_block_height
568                        ).into());
569                    };
570
571                    self.handler
572                        .handle_new_block(&mut dbtx, new_block_id, block, expected_next_finalized)
573                        .await?;
574
575                    expected_next_finalized += 1;
576                }
577            }
578            BitcoinSyncerEvent::ReorgedBlock(_) => {}
579        };
580
581        dbtx.commit().await?;
582        // update next height only after db commit is successful so next_height is consistent with state in DB
583        self.next_finalized_height = expected_next_finalized;
584        // Return whether the btc chain state has changed (either new blocks or reorgs)
585        Ok(true)
586    }
587}
588
589#[async_trait]
590impl<H: BlockHandler> RecoverableTask for FinalizedBlockFetcherTask<H> {
591    async fn recover_from_error(&mut self, _error: &BridgeError) -> Result<(), BridgeError> {
592        // No action needed. Errors will cause a rollback and the task will retry on the next run.
593        // In-memory data remains in sync (as it's only updated after db commit is successful).
594        Ok(())
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use crate::bitcoin_syncer::BitcoinSyncer;
601    use crate::builder::transaction::DEFAULT_SEQUENCE;
602    use crate::task::{IntoTask, TaskExt};
603    use crate::{database::Database, test::common::*};
604    use bitcoin::absolute::Height;
605    use bitcoin::hashes::Hash;
606    use bitcoin::transaction::Version;
607    use bitcoin::{OutPoint, ScriptBuf, Transaction, TxIn, Witness};
608    use bitcoincore_rpc::RpcApi;
609
610    #[tokio::test]
611    async fn get_block_info_from_height() {
612        let mut config = create_test_config_with_thread_name().await;
613        let regtest = create_regtest_rpc(&mut config).await;
614        let rpc = regtest.rpc().clone();
615
616        rpc.mine_blocks(1).await.unwrap();
617        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
618        let hash = rpc.get_block_hash(height as u64).await.unwrap();
619        let header = rpc.get_block_header(&hash).await.unwrap();
620
621        let block_info = super::fetch_block_info_from_height(&rpc, height)
622            .await
623            .unwrap();
624        assert_eq!(block_info._header, header);
625        assert_eq!(block_info.hash, hash);
626        assert_eq!(block_info.height, height);
627
628        rpc.mine_blocks(1).await.unwrap();
629        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
630
631        let block_info = super::fetch_block_info_from_height(&rpc, height)
632            .await
633            .unwrap();
634        assert_ne!(block_info._header, header);
635        assert_ne!(block_info.hash, hash);
636        assert_eq!(block_info.height, height);
637    }
638
639    #[tokio::test]
640    async fn save_get_transaction_spent_utxos() {
641        let mut config = create_test_config_with_thread_name().await;
642        let db = Database::new(&config).await.unwrap();
643        let regtest = create_regtest_rpc(&mut config).await;
644        let rpc = regtest.rpc().clone();
645
646        let mut dbtx = db.begin_transaction().await.unwrap();
647
648        rpc.mine_blocks(1).await.unwrap();
649        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
650        let hash = rpc.get_block_hash(height as u64).await.unwrap();
651        let block = rpc.get_block(&hash).await.unwrap();
652        let block_id = super::save_block(&db, &mut dbtx, &block, height)
653            .await
654            .unwrap();
655
656        let inputs = vec![
657            TxIn {
658                previous_output: OutPoint {
659                    txid: bitcoin::Txid::all_zeros(),
660                    vout: 0,
661                },
662                script_sig: ScriptBuf::default(),
663                sequence: DEFAULT_SEQUENCE,
664                witness: Witness::default(),
665            },
666            TxIn {
667                previous_output: OutPoint {
668                    txid: bitcoin::Txid::all_zeros(),
669                    vout: 1,
670                },
671                script_sig: ScriptBuf::default(),
672                sequence: DEFAULT_SEQUENCE,
673                witness: Witness::default(),
674            },
675        ];
676        let tx = Transaction {
677            version: Version::TWO,
678            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
679            input: inputs.clone(),
680            output: vec![],
681        };
682        super::save_transaction_spent_utxos(&db, &mut dbtx, &tx, block_id)
683            .await
684            .unwrap();
685
686        let utxos = super::_get_transaction_spent_utxos(&db, &mut dbtx, tx.compute_txid())
687            .await
688            .unwrap();
689
690        for (index, input) in inputs.iter().enumerate() {
691            assert_eq!(input.previous_output, utxos[index]);
692        }
693
694        dbtx.commit().await.unwrap();
695    }
696
697    #[tokio::test]
698    async fn save_get_block() {
699        let mut config = create_test_config_with_thread_name().await;
700        let db = Database::new(&config).await.unwrap();
701        let regtest = create_regtest_rpc(&mut config).await;
702        let rpc = regtest.rpc().clone();
703
704        let mut dbtx = db.begin_transaction().await.unwrap();
705
706        rpc.mine_blocks(1).await.unwrap();
707        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
708        let hash = rpc.get_block_hash(height as u64).await.unwrap();
709        let block = rpc.get_block(&hash).await.unwrap();
710
711        super::save_block(&db, &mut dbtx, &block, height)
712            .await
713            .unwrap();
714
715        let (block_info, utxos) = super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
716            .await
717            .unwrap();
718        assert_eq!(block_info._header, block.header);
719        assert_eq!(block_info.hash, hash);
720        assert_eq!(block_info.height, height);
721        for (tx_index, tx) in block.txdata.iter().enumerate() {
722            for (txin_index, txin) in tx.input.iter().enumerate() {
723                assert_eq!(txin.previous_output, utxos[tx_index][txin_index]);
724            }
725        }
726
727        dbtx.commit().await.unwrap();
728    }
729
730    #[tokio::test]
731    async fn set_initial_block_info_if_not_exists() {
732        let mut config = create_test_config_with_thread_name().await;
733        let db = Database::new(&config).await.unwrap();
734        let regtest = create_regtest_rpc(&mut config).await;
735        let rpc = regtest.rpc().clone();
736
737        let mut dbtx = db.begin_transaction().await.unwrap();
738
739        rpc.mine_blocks(1).await.unwrap();
740        // let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
741        let hash = rpc
742            .get_block_hash(config.protocol_paramset().start_height as u64)
743            .await
744            .unwrap();
745        let block = rpc.get_block(&hash).await.unwrap();
746
747        assert!(super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
748            .await
749            .is_err());
750
751        super::set_initial_block_info_if_not_exists(&db, &rpc, config.protocol_paramset())
752            .await
753            .unwrap();
754
755        let (block_info, utxos) = super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
756            .await
757            .unwrap();
758        assert_eq!(block_info.hash, hash);
759        assert_eq!(block_info.height, config.protocol_paramset().start_height);
760
761        for (tx_index, tx) in block.txdata.iter().enumerate() {
762            for (txin_index, txin) in tx.input.iter().enumerate() {
763                assert_eq!(txin.previous_output, utxos[tx_index][txin_index]);
764            }
765        }
766    }
767
768    #[tokio::test]
769    async fn fetch_new_blocks_forward() {
770        let mut config = create_test_config_with_thread_name().await;
771        let db = Database::new(&config).await.unwrap();
772        let regtest = create_regtest_rpc(&mut config).await;
773        let rpc = regtest.rpc().clone();
774
775        let mut dbtx = db.begin_transaction().await.unwrap();
776
777        rpc.mine_blocks(1).await.unwrap();
778        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
779        let hash = rpc.get_block_hash(height as u64).await.unwrap();
780        let block = rpc.get_block(&hash).await.unwrap();
781        super::save_block(&db, &mut dbtx, &block, height)
782            .await
783            .unwrap();
784        dbtx.commit().await.unwrap();
785
786        let new_blocks =
787            super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
788                .await
789                .unwrap();
790        assert!(new_blocks.is_none());
791
792        let new_block_hashes = rpc.mine_blocks(1).await.unwrap();
793        let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
794        let new_blocks =
795            super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
796                .await
797                .unwrap()
798                .unwrap();
799        assert_eq!(new_blocks.len(), 1);
800        assert_eq!(new_blocks.first().unwrap().height, new_height);
801        assert_eq!(
802            new_blocks.first().unwrap().hash,
803            *new_block_hashes.first().unwrap()
804        );
805    }
806
807    #[tokio::test]
808    async fn fetch_new_blocks_backwards() {
809        let mut config = create_test_config_with_thread_name().await;
810        let db = Database::new(&config).await.unwrap();
811        let regtest = create_regtest_rpc(&mut config).await;
812        let rpc = regtest.rpc().clone();
813
814        // Prepare chain.
815        rpc.mine_blocks(1).await.unwrap();
816        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
817        let hash = rpc.get_block_hash(height as u64).await.unwrap();
818        let block = rpc.get_block(&hash).await.unwrap();
819
820        // Save the tip.
821        let mut dbtx = db.begin_transaction().await.unwrap();
822        super::save_block(&db, &mut dbtx, &block, height)
823            .await
824            .unwrap();
825        dbtx.commit().await.unwrap();
826
827        let new_blocks =
828            super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
829                .await
830                .unwrap();
831        assert!(new_blocks.is_none());
832
833        // Mine new blocks without saving them.
834        let mine_count: u32 = config.protocol_paramset().finality_depth - 1;
835        let new_block_hashes = rpc.mine_blocks(mine_count as u64).await.unwrap();
836        let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
837
838        let new_blocks = super::fetch_new_blocks(
839            &db,
840            &rpc,
841            new_height - 1,
842            config.protocol_paramset().finality_depth,
843        )
844        .await
845        .unwrap()
846        .unwrap();
847        assert_eq!(new_blocks.len(), mine_count as usize);
848        for (index, block) in new_blocks.iter().enumerate() {
849            assert_eq!(block.height, new_height - mine_count + index as u32 + 1);
850            assert_eq!(block.hash, new_block_hashes[index]);
851        }
852
853        // Mine too many blocks.
854        let mine_count: u32 = config.protocol_paramset().finality_depth;
855        rpc.mine_blocks(mine_count as u64).await.unwrap();
856        let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
857
858        assert!(super::fetch_new_blocks(
859            &db,
860            &rpc,
861            new_height - 1,
862            config.protocol_paramset().finality_depth
863        )
864        .await
865        .is_err());
866    }
867    #[ignore]
868    #[tokio::test]
869    async fn set_non_canonical_block_hashes() {
870        let mut config = create_test_config_with_thread_name().await;
871        let db = Database::new(&config).await.unwrap();
872        let regtest = create_regtest_rpc(&mut config).await;
873        let rpc = regtest.rpc().clone();
874
875        let hashes = rpc.mine_blocks(4).await.unwrap();
876        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
877
878        super::set_initial_block_info_if_not_exists(&db, &rpc, config.protocol_paramset())
879            .await
880            .unwrap();
881
882        rpc.invalidate_block(hashes.get(3).unwrap()).await.unwrap();
883        rpc.invalidate_block(hashes.get(2).unwrap()).await.unwrap();
884
885        let mut dbtx = db.begin_transaction().await.unwrap();
886
887        let last_db_block =
888            super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, *hashes.get(3).unwrap())
889                .await
890                .unwrap();
891        assert_eq!(last_db_block.0.height, height);
892        assert_eq!(last_db_block.0.hash, *hashes.get(3).unwrap());
893
894        super::handle_reorg_events(&db, &mut dbtx, height - 2)
895            .await
896            .unwrap();
897
898        assert!(
899            super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, *hashes.get(3).unwrap())
900                .await
901                .is_err()
902        );
903
904        dbtx.commit().await.unwrap();
905    }
906
907    #[tokio::test]
908    async fn start_bitcoin_syncer_new_block_mined() {
909        let mut config = create_test_config_with_thread_name().await;
910        let db = Database::new(&config).await.unwrap();
911        let regtest = create_regtest_rpc(&mut config).await;
912        let rpc = regtest.rpc().clone();
913
914        rpc.mine_blocks(1).await.unwrap();
915        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
916        let hash = rpc.get_block_hash(height as u64).await.unwrap();
917
918        let (looping_task, _cancel_tx) =
919            BitcoinSyncer::new(db.clone(), rpc.clone(), config.protocol_paramset())
920                .await
921                .unwrap()
922                .into_task()
923                .cancelable_loop();
924
925        looping_task.into_bg();
926
927        loop {
928            let mut dbtx = db.begin_transaction().await.unwrap();
929
930            let last_db_block =
931                match super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash).await {
932                    Ok(block) => block,
933                    Err(_) => {
934                        dbtx.commit().await.unwrap();
935                        continue;
936                    }
937                };
938
939            assert_eq!(last_db_block.0.height, height);
940            assert_eq!(last_db_block.0.hash, hash);
941
942            dbtx.commit().await.unwrap();
943            break;
944        }
945    }
946}