clementine_core/
bitcoin_syncer.rs

1//! # Bitcoin Syncer
2//!
3//! This module provides common utilities to fetch Bitcoin state. Other modules
4//! can use this module to operate over Bitcoin. Every block starting from
5//! `paramset.start_height` is fetched and stored in the database.
6
7use crate::{
8    config::protocol::ProtocolParamset,
9    database::{Database, DatabaseTransaction},
10    extended_bitcoin_rpc::ExtendedBitcoinRpc,
11    task::{IntoTask, RecoverableTask, Task, TaskExt, TaskVariant, WithDelay},
12};
13use bitcoin::{block::Header, BlockHash, OutPoint};
14use bitcoincore_rpc::RpcApi;
15use clementine_errors::BridgeError;
16use eyre::Context;
17use std::time::Duration;
18use tonic::async_trait;
19
20pub const BTC_SYNCER_POLL_DELAY: Duration = if cfg!(test) {
21    Duration::from_millis(250)
22} else {
23    Duration::from_secs(30)
24};
25
26/// Represents basic information of a Bitcoin block.
27#[derive(Clone, Debug)]
28struct BlockInfo {
29    hash: BlockHash,
30    _header: Header,
31    height: u32,
32}
33
34pub use clementine_primitives::BitcoinSyncerEvent;
35
36/// Trait for handling new blocks as they are finalized
37#[async_trait]
38pub trait BlockHandler: Send + Sync + 'static {
39    /// Handle a new finalized block
40    async fn handle_new_block(
41        &mut self,
42        dbtx: DatabaseTransaction<'_>,
43        block_id: u32,
44        block: bitcoin::Block,
45        height: u32,
46    ) -> Result<(), BridgeError>;
47}
48
49/// Fetches the [`BlockInfo`] for a given height from Bitcoin.
50async fn fetch_block_info_from_height(
51    rpc: &ExtendedBitcoinRpc,
52    height: u32,
53) -> Result<BlockInfo, BridgeError> {
54    let hash = rpc
55        .get_block_hash(height as u64)
56        .await
57        .wrap_err("Failed to get block hash")?;
58    let header = rpc
59        .get_block_header(&hash)
60        .await
61        .wrap_err("Failed to get block header")?;
62
63    Ok(BlockInfo {
64        hash,
65        _header: header,
66        height,
67    })
68}
69
70/// Saves a Bitcoin block's metadata and it's transactions into the database.
71pub(crate) async fn save_block(
72    db: &Database,
73    dbtx: DatabaseTransaction<'_>,
74    block: &bitcoin::Block,
75    block_height: u32,
76) -> Result<u32, BridgeError> {
77    let block_hash = block.block_hash();
78    tracing::debug!(
79        "Saving a block with hash of {} and height of {}",
80        block_hash,
81        block_height
82    );
83
84    // update the block_info as canonical if it already exists
85    let block_id = db.update_block_as_canonical(Some(dbtx), block_hash).await?;
86    db.upsert_full_block(Some(dbtx), block, block_height)
87        .await?;
88
89    if let Some(block_id) = block_id {
90        return Ok(block_id);
91    }
92
93    let block_id = db
94        .insert_block_info(
95            Some(dbtx),
96            &block_hash,
97            &block.header.prev_blockhash,
98            block_height,
99        )
100        .await?;
101
102    tracing::debug!(
103        "Saving {} transactions to a block with hash {}",
104        block.txdata.len(),
105        block_hash
106    );
107    for tx in &block.txdata {
108        save_transaction_spent_utxos(db, dbtx, tx, block_id).await?;
109    }
110
111    Ok(block_id)
112}
113async fn _get_block_info_from_hash(
114    db: &Database,
115    dbtx: DatabaseTransaction<'_>,
116    rpc: &ExtendedBitcoinRpc,
117    hash: BlockHash,
118) -> Result<(BlockInfo, Vec<Vec<OutPoint>>), BridgeError> {
119    let block = rpc.get_block(&hash).await.wrap_err("Failed to get block")?;
120    let block_height = db
121        .get_block_info_from_hash(Some(dbtx), hash)
122        .await?
123        .ok_or_else(|| eyre::eyre!("Block not found in get_block_info_from_hash"))?
124        .1;
125
126    let mut block_utxos: Vec<Vec<OutPoint>> = Vec::new();
127    for tx in &block.txdata {
128        let txid = tx.compute_txid();
129        let spent_utxos = _get_transaction_spent_utxos(db, dbtx, txid).await?;
130        block_utxos.push(spent_utxos);
131    }
132
133    let block_info = BlockInfo {
134        hash,
135        _header: block.header,
136        height: block_height,
137    };
138
139    Ok((block_info, block_utxos))
140}
141
142/// Saves a Bitcoin transaction and its spent UTXOs to the database.
143async fn save_transaction_spent_utxos(
144    db: &Database,
145    dbtx: DatabaseTransaction<'_>,
146    tx: &bitcoin::Transaction,
147    block_id: u32,
148) -> Result<(), BridgeError> {
149    let txid = tx.compute_txid();
150    db.insert_txid_to_block(dbtx, block_id, &txid).await?;
151
152    for input in &tx.input {
153        db.insert_spent_utxo(
154            dbtx,
155            block_id,
156            &txid,
157            &input.previous_output.txid,
158            input.previous_output.vout as i64,
159        )
160        .await?;
161    }
162
163    Ok(())
164}
165async fn _get_transaction_spent_utxos(
166    db: &Database,
167    dbtx: DatabaseTransaction<'_>,
168    txid: bitcoin::Txid,
169) -> Result<Vec<OutPoint>, BridgeError> {
170    let utxos = db.get_spent_utxos_for_txid(Some(dbtx), txid).await?;
171    let utxos = utxos.into_iter().map(|utxo| utxo.1).collect::<Vec<_>>();
172
173    Ok(utxos)
174}
175
176/// If no block info exists in the DB, fetches the current block from the RPC and initializes the DB.
177pub async fn set_initial_block_info_if_not_exists(
178    db: &Database,
179    rpc: &ExtendedBitcoinRpc,
180    paramset: &'static ProtocolParamset,
181) -> Result<(), BridgeError> {
182    if db.get_max_height(None).await?.is_some() {
183        return Ok(());
184    }
185
186    let current_height = u32::try_from(
187        rpc.get_block_count()
188            .await
189            .wrap_err("Failed to get block count")?,
190    )
191    .wrap_err(BridgeError::IntConversionError)?;
192
193    if paramset.start_height > current_height {
194        tracing::error!(
195            "Bitcoin syncer could not find enough available blocks in chain (Likely a regtest problem). start_height ({}) > current_height ({})",
196            paramset.start_height,
197            current_height
198        );
199        return Ok(());
200    }
201
202    let height = paramset.start_height;
203    let mut dbtx = db.begin_transaction().await?;
204    // first collect previous needed blocks according to paramset start height
205    let block_info = fetch_block_info_from_height(rpc, height).await?;
206    let block = rpc
207        .get_block(&block_info.hash)
208        .await
209        .wrap_err("Failed to get block")?;
210    let block_id = save_block(db, &mut dbtx, &block, height).await?;
211    db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
212        .await?;
213
214    dbtx.commit().await?;
215
216    Ok(())
217}
218
219/// Fetches the next block from Bitcoin, if it exists. Will also fetch previous
220/// blocks if the parent is missing, and give an error if the number of reorged blocks is greater than the finality depth.
221///
222/// # Parameters
223///
224/// - `current_height`: The height of the current tip **in the database**.
225///
226/// # Returns
227///
228/// - [`None`] - If no new block is available.
229/// - [`Vec<BlockInfo>`] - If new blocks are found.
230/// - [`BridgeError`] - If the number of reorged blocks is greater than the finality depth or db/rpc errors.
231async fn fetch_new_blocks(
232    db: &Database,
233    rpc: &ExtendedBitcoinRpc,
234    current_height: u32,
235    finality_depth: u32,
236) -> Result<Option<Vec<BlockInfo>>, BridgeError> {
237    let next_height = current_height + 1;
238
239    // Try to fetch the block hash for the next height.
240    let block_hash = match rpc.get_block_hash(next_height as u64).await {
241        Ok(hash) => hash,
242        Err(_) => return Ok(None),
243    };
244    tracing::debug!(
245        "Fetching block with hash of {:?} and height of {}...",
246        block_hash,
247        next_height
248    );
249
250    // Fetch its header.
251    let mut block_header = rpc
252        .get_block_header(&block_hash)
253        .await
254        .wrap_err("Failed to get block header")?;
255    let mut new_blocks = vec![BlockInfo {
256        hash: block_hash,
257        _header: block_header,
258        height: next_height,
259    }];
260
261    // Walk backwards until the parent is found in the database.
262    while db
263        .get_block_info_from_hash(None, block_header.prev_blockhash)
264        .await?
265        .is_none()
266    {
267        let prev_block_hash = block_header.prev_blockhash;
268        block_header = rpc
269            .get_block_header(&prev_block_hash)
270            .await
271            .wrap_err("Failed to get block header")?;
272        let new_height = new_blocks.last().expect("new_blocks is empty").height - 1;
273        new_blocks.push(BlockInfo {
274            hash: prev_block_hash,
275            _header: block_header,
276            height: new_height,
277        });
278
279        // new blocks includes the new one (block with height our previous tip + 1),
280        // so new_blocks.len() = 5 -> 4 reorged blocks,
281        if new_blocks.len() as u32 > finality_depth {
282            return Err(eyre::eyre!(
283                "Number of reorged blocks {} is greater than finality depth {}, reorged blocks: {:?}. If true, increase finality depth and resync the chain",
284                new_blocks.len() - 1,
285                finality_depth,
286                new_blocks
287            )
288            .into());
289        }
290    }
291
292    // The chain was built from tip to fork; reverse it to be in ascending order.
293    new_blocks.reverse();
294
295    Ok(Some(new_blocks))
296}
297
298/// Marks blocks above the common ancestor as non-canonical and emits reorg events.
299#[tracing::instrument(skip(db, dbtx), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
300async fn handle_reorg_events(
301    db: &Database,
302    dbtx: DatabaseTransaction<'_>,
303    common_ancestor_height: u32,
304) -> Result<(), BridgeError> {
305    let reorg_blocks = db
306        .update_non_canonical_block_hashes(Some(dbtx), common_ancestor_height)
307        .await?;
308    if !reorg_blocks.is_empty() {
309        tracing::debug!("Reorg occurred! Block ids: {:?}", reorg_blocks);
310    }
311
312    for reorg_block_id in reorg_blocks {
313        db.insert_event(Some(dbtx), BitcoinSyncerEvent::ReorgedBlock(reorg_block_id))
314            .await?;
315    }
316
317    Ok(())
318}
319
320/// Processes and inserts new blocks into the database, emitting a new block event for each.
321async fn process_new_blocks(
322    db: &Database,
323    rpc: &ExtendedBitcoinRpc,
324    dbtx: DatabaseTransaction<'_>,
325    new_blocks: &[BlockInfo],
326) -> Result<(), BridgeError> {
327    for block_info in new_blocks {
328        let block = rpc
329            .get_block(&block_info.hash)
330            .await
331            .wrap_err("Failed to get block")?;
332
333        let block_id = save_block(db, dbtx, &block, block_info.height).await?;
334        db.insert_event(Some(dbtx), BitcoinSyncerEvent::NewBlock(block_id))
335            .await?;
336    }
337
338    Ok(())
339}
340
341/// A task that syncs Bitcoin blocks from the Bitcoin node to the local database.
342#[derive(Debug)]
343pub struct BitcoinSyncerTask {
344    /// The database to store blocks in
345    db: Database,
346    /// The RPC client to fetch blocks from
347    rpc: ExtendedBitcoinRpc,
348    /// The current block height that has been synced
349    current_height: u32,
350    /// The finality depth
351    finality_depth: u32,
352}
353
354#[derive(Debug)]
355pub struct BitcoinSyncer {
356    /// The database to store blocks in
357    db: Database,
358    /// The RPC client to fetch blocks from
359    rpc: ExtendedBitcoinRpc,
360    /// The current block height that has been synced
361    current_height: u32,
362    /// The finality depth
363    finality_depth: u32,
364}
365
366impl BitcoinSyncer {
367    /// Creates a new Bitcoin syncer task.
368    ///
369    /// This function initializes the database with the first block if it's empty.
370    pub async fn new(
371        db: Database,
372        rpc: ExtendedBitcoinRpc,
373        paramset: &'static ProtocolParamset,
374    ) -> Result<Self, BridgeError> {
375        // Initialize the database if needed
376        set_initial_block_info_if_not_exists(&db, &rpc, paramset).await?;
377
378        // Get the current height from the database
379        let current_height = db
380            .get_max_height(None)
381            .await?
382            .ok_or_else(|| eyre::eyre!("Block not found in BitcoinSyncer::new"))?;
383
384        Ok(Self {
385            db,
386            rpc,
387            current_height,
388            finality_depth: paramset.finality_depth,
389        })
390    }
391}
392impl IntoTask for BitcoinSyncer {
393    type Task = WithDelay<BitcoinSyncerTask>;
394
395    fn into_task(self) -> Self::Task {
396        BitcoinSyncerTask {
397            db: self.db,
398            rpc: self.rpc,
399            current_height: self.current_height,
400            finality_depth: self.finality_depth,
401        }
402        .with_delay(BTC_SYNCER_POLL_DELAY)
403    }
404}
405
406#[async_trait]
407impl Task for BitcoinSyncerTask {
408    type Output = bool;
409    const VARIANT: TaskVariant = TaskVariant::BitcoinSyncer;
410
411    #[tracing::instrument(skip(self))]
412    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
413        let new_blocks = match fetch_new_blocks(
414            &self.db,
415            &self.rpc,
416            self.current_height,
417            self.finality_depth,
418        )
419        .await?
420        {
421            Some(blocks) if !blocks.is_empty() => {
422                tracing::debug!(
423                    "{} new blocks found after current height {}",
424                    blocks.len(),
425                    self.current_height
426                );
427
428                blocks
429            }
430            _ => {
431                tracing::debug!(
432                    "No new blocks found after current height: {}",
433                    self.current_height
434                );
435
436                return Ok(false);
437            }
438        };
439
440        // The common ancestor is the block preceding the first new block.
441        // Please note that this won't always be the `self.current_height`.
442        // Because `fetch_next_block` can fetch older blocks, if db is missing
443        // them.
444        let common_ancestor_height = new_blocks[0].height - 1;
445        tracing::debug!("Common ancestor height: {:?}", common_ancestor_height);
446        let mut dbtx = self.db.begin_transaction().await?;
447
448        // Mark reorg blocks (if any) as non-canonical.
449        handle_reorg_events(&self.db, &mut dbtx, common_ancestor_height).await?;
450        tracing::debug!("BitcoinSyncer: Marked reorg blocks as non-canonical");
451
452        // Process and insert the new blocks.
453        tracing::debug!("BitcoinSyncer: Processing new blocks");
454        tracing::debug!("BitcoinSyncer: New blocks: {:?}", new_blocks.len());
455        process_new_blocks(&self.db, &self.rpc, &mut dbtx, &new_blocks).await?;
456
457        dbtx.commit().await?;
458
459        // Update the current height to the tip of the new chain.
460        tracing::debug!("BitcoinSyncer: Updating current height");
461        self.current_height = new_blocks.last().expect("new_blocks is not empty").height;
462        tracing::debug!("BitcoinSyncer: Current height: {:?}", self.current_height);
463
464        // Return true to indicate work was done
465        Ok(true)
466    }
467}
468
469#[derive(Debug)]
470pub struct FinalizedBlockFetcherTask<H: BlockHandler> {
471    db: Database,
472    btc_syncer_consumer_id: String,
473    paramset: &'static ProtocolParamset,
474    next_finalized_height: u32,
475    handler: H,
476}
477
478impl<H: BlockHandler> FinalizedBlockFetcherTask<H> {
479    pub fn new(
480        db: Database,
481        btc_syncer_consumer_id: String,
482        paramset: &'static ProtocolParamset,
483        next_finalized_height: u32,
484        handler: H,
485    ) -> Self {
486        Self {
487            db,
488            btc_syncer_consumer_id,
489            paramset,
490            next_finalized_height,
491            handler,
492        }
493    }
494}
495
496#[async_trait]
497impl<H: BlockHandler> Task for FinalizedBlockFetcherTask<H> {
498    type Output = bool;
499    const VARIANT: TaskVariant = TaskVariant::FinalizedBlockFetcher;
500
501    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
502        let mut dbtx = self.db.begin_transaction().await?;
503
504        // Poll for the next bitcoin syncer event
505        let Some(event) = self
506            .db
507            .fetch_next_bitcoin_syncer_evt(&mut dbtx, &self.btc_syncer_consumer_id)
508            .await?
509        else {
510            // No event found, we can safely commit the transaction and return
511            dbtx.commit().await?;
512            return Ok(false);
513        };
514        let mut expected_next_finalized = self.next_finalized_height;
515
516        // Process the event
517        let did_find_new_block = match event {
518            BitcoinSyncerEvent::NewBlock(block_id) => {
519                let new_block_height = self
520                    .db
521                    .get_block_info_from_id(Some(&mut dbtx), block_id)
522                    .await?
523                    .ok_or(eyre::eyre!("Block not found in BlockFetcherTask",))?
524                    .1;
525
526                // whether there's a new finalized block, can be false if reorged to the same height
527                let mut new_tip = false;
528                let mut warned = false;
529
530                // Update states to catch up to finalized chain
531                while self
532                    .paramset
533                    .is_block_finalized(expected_next_finalized, new_block_height)
534                {
535                    if new_tip && !warned {
536                        warned = true;
537                        // this event is multiple blocks away, report
538                        tracing::warn!("Received event with multiple finalized blocks, expected 1 for ordered events. Got a new block with height {new_block_height}, expected next finalized block {}", self.next_finalized_height);
539                    }
540                    new_tip = true;
541
542                    let block = self
543                        .db
544                        .get_full_block(Some(&mut dbtx), expected_next_finalized)
545                        .await?
546                        .ok_or(eyre::eyre!(
547                            "Block at height {} not found in BlockFetcherTask, current tip height is {}",
548                            expected_next_finalized, new_block_height
549                        ))?;
550
551                    let new_block_id = self
552                        .db
553                        .get_canonical_block_id_from_height(
554                            Some(&mut dbtx),
555                            expected_next_finalized,
556                        )
557                        .await?;
558
559                    let Some(new_block_id) = new_block_id else {
560                        tracing::error!("Block at height {} not found in BlockFetcherTask, current tip height is {}", expected_next_finalized, new_block_height);
561                        return Err(eyre::eyre!(
562                            "Block at height {} not found in BlockFetcherTask, current tip height is {}",
563                            expected_next_finalized, new_block_height
564                        ).into());
565                    };
566
567                    self.handler
568                        .handle_new_block(&mut dbtx, new_block_id, block, expected_next_finalized)
569                        .await?;
570
571                    expected_next_finalized += 1;
572                }
573
574                new_tip
575            }
576            BitcoinSyncerEvent::ReorgedBlock(_) => false,
577        };
578
579        dbtx.commit().await?;
580        // update next height only after db commit is successful so next_height is consistent with state in DB
581        self.next_finalized_height = expected_next_finalized;
582        // Return whether we found new blocks
583        Ok(did_find_new_block)
584    }
585}
586
587#[async_trait]
588impl<H: BlockHandler> RecoverableTask for FinalizedBlockFetcherTask<H> {
589    async fn recover_from_error(&mut self, _error: &BridgeError) -> Result<(), BridgeError> {
590        // No action needed. Errors will cause a rollback and the task will retry on the next run.
591        // In-memory data remains in sync (as it's only updated after db commit is successful).
592        Ok(())
593    }
594}
595
596#[cfg(test)]
597mod tests {
598    use crate::bitcoin_syncer::BitcoinSyncer;
599    use crate::builder::transaction::DEFAULT_SEQUENCE;
600    use crate::task::{IntoTask, TaskExt};
601    use crate::{database::Database, test::common::*};
602    use bitcoin::absolute::Height;
603    use bitcoin::hashes::Hash;
604    use bitcoin::transaction::Version;
605    use bitcoin::{OutPoint, ScriptBuf, Transaction, TxIn, Witness};
606    use bitcoincore_rpc::RpcApi;
607
608    #[tokio::test]
609    async fn get_block_info_from_height() {
610        let mut config = create_test_config_with_thread_name().await;
611        let regtest = create_regtest_rpc(&mut config).await;
612        let rpc = regtest.rpc().clone();
613
614        rpc.mine_blocks(1).await.unwrap();
615        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
616        let hash = rpc.get_block_hash(height as u64).await.unwrap();
617        let header = rpc.get_block_header(&hash).await.unwrap();
618
619        let block_info = super::fetch_block_info_from_height(&rpc, height)
620            .await
621            .unwrap();
622        assert_eq!(block_info._header, header);
623        assert_eq!(block_info.hash, hash);
624        assert_eq!(block_info.height, height);
625
626        rpc.mine_blocks(1).await.unwrap();
627        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
628
629        let block_info = super::fetch_block_info_from_height(&rpc, height)
630            .await
631            .unwrap();
632        assert_ne!(block_info._header, header);
633        assert_ne!(block_info.hash, hash);
634        assert_eq!(block_info.height, height);
635    }
636
637    #[tokio::test]
638    async fn save_get_transaction_spent_utxos() {
639        let mut config = create_test_config_with_thread_name().await;
640        let db = Database::new(&config).await.unwrap();
641        let regtest = create_regtest_rpc(&mut config).await;
642        let rpc = regtest.rpc().clone();
643
644        let mut dbtx = db.begin_transaction().await.unwrap();
645
646        rpc.mine_blocks(1).await.unwrap();
647        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
648        let hash = rpc.get_block_hash(height as u64).await.unwrap();
649        let block = rpc.get_block(&hash).await.unwrap();
650        let block_id = super::save_block(&db, &mut dbtx, &block, height)
651            .await
652            .unwrap();
653
654        let inputs = vec![
655            TxIn {
656                previous_output: OutPoint {
657                    txid: bitcoin::Txid::all_zeros(),
658                    vout: 0,
659                },
660                script_sig: ScriptBuf::default(),
661                sequence: DEFAULT_SEQUENCE,
662                witness: Witness::default(),
663            },
664            TxIn {
665                previous_output: OutPoint {
666                    txid: bitcoin::Txid::all_zeros(),
667                    vout: 1,
668                },
669                script_sig: ScriptBuf::default(),
670                sequence: DEFAULT_SEQUENCE,
671                witness: Witness::default(),
672            },
673        ];
674        let tx = Transaction {
675            version: Version::TWO,
676            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
677            input: inputs.clone(),
678            output: vec![],
679        };
680        super::save_transaction_spent_utxos(&db, &mut dbtx, &tx, block_id)
681            .await
682            .unwrap();
683
684        let utxos = super::_get_transaction_spent_utxos(&db, &mut dbtx, tx.compute_txid())
685            .await
686            .unwrap();
687
688        for (index, input) in inputs.iter().enumerate() {
689            assert_eq!(input.previous_output, utxos[index]);
690        }
691
692        dbtx.commit().await.unwrap();
693    }
694
695    #[tokio::test]
696    async fn save_get_block() {
697        let mut config = create_test_config_with_thread_name().await;
698        let db = Database::new(&config).await.unwrap();
699        let regtest = create_regtest_rpc(&mut config).await;
700        let rpc = regtest.rpc().clone();
701
702        let mut dbtx = db.begin_transaction().await.unwrap();
703
704        rpc.mine_blocks(1).await.unwrap();
705        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
706        let hash = rpc.get_block_hash(height as u64).await.unwrap();
707        let block = rpc.get_block(&hash).await.unwrap();
708
709        super::save_block(&db, &mut dbtx, &block, height)
710            .await
711            .unwrap();
712
713        let (block_info, utxos) = super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
714            .await
715            .unwrap();
716        assert_eq!(block_info._header, block.header);
717        assert_eq!(block_info.hash, hash);
718        assert_eq!(block_info.height, height);
719        for (tx_index, tx) in block.txdata.iter().enumerate() {
720            for (txin_index, txin) in tx.input.iter().enumerate() {
721                assert_eq!(txin.previous_output, utxos[tx_index][txin_index]);
722            }
723        }
724
725        dbtx.commit().await.unwrap();
726    }
727
728    #[tokio::test]
729    async fn set_initial_block_info_if_not_exists() {
730        let mut config = create_test_config_with_thread_name().await;
731        let db = Database::new(&config).await.unwrap();
732        let regtest = create_regtest_rpc(&mut config).await;
733        let rpc = regtest.rpc().clone();
734
735        let mut dbtx = db.begin_transaction().await.unwrap();
736
737        rpc.mine_blocks(1).await.unwrap();
738        // let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
739        let hash = rpc
740            .get_block_hash(config.protocol_paramset().start_height as u64)
741            .await
742            .unwrap();
743        let block = rpc.get_block(&hash).await.unwrap();
744
745        assert!(super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
746            .await
747            .is_err());
748
749        super::set_initial_block_info_if_not_exists(&db, &rpc, config.protocol_paramset())
750            .await
751            .unwrap();
752
753        let (block_info, utxos) = super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
754            .await
755            .unwrap();
756        assert_eq!(block_info.hash, hash);
757        assert_eq!(block_info.height, config.protocol_paramset().start_height);
758
759        for (tx_index, tx) in block.txdata.iter().enumerate() {
760            for (txin_index, txin) in tx.input.iter().enumerate() {
761                assert_eq!(txin.previous_output, utxos[tx_index][txin_index]);
762            }
763        }
764    }
765
766    #[tokio::test]
767    async fn fetch_new_blocks_forward() {
768        let mut config = create_test_config_with_thread_name().await;
769        let db = Database::new(&config).await.unwrap();
770        let regtest = create_regtest_rpc(&mut config).await;
771        let rpc = regtest.rpc().clone();
772
773        let mut dbtx = db.begin_transaction().await.unwrap();
774
775        rpc.mine_blocks(1).await.unwrap();
776        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
777        let hash = rpc.get_block_hash(height as u64).await.unwrap();
778        let block = rpc.get_block(&hash).await.unwrap();
779        super::save_block(&db, &mut dbtx, &block, height)
780            .await
781            .unwrap();
782        dbtx.commit().await.unwrap();
783
784        let new_blocks =
785            super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
786                .await
787                .unwrap();
788        assert!(new_blocks.is_none());
789
790        let new_block_hashes = rpc.mine_blocks(1).await.unwrap();
791        let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
792        let new_blocks =
793            super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
794                .await
795                .unwrap()
796                .unwrap();
797        assert_eq!(new_blocks.len(), 1);
798        assert_eq!(new_blocks.first().unwrap().height, new_height);
799        assert_eq!(
800            new_blocks.first().unwrap().hash,
801            *new_block_hashes.first().unwrap()
802        );
803    }
804
805    #[tokio::test]
806    async fn fetch_new_blocks_backwards() {
807        let mut config = create_test_config_with_thread_name().await;
808        let db = Database::new(&config).await.unwrap();
809        let regtest = create_regtest_rpc(&mut config).await;
810        let rpc = regtest.rpc().clone();
811
812        // Prepare chain.
813        rpc.mine_blocks(1).await.unwrap();
814        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
815        let hash = rpc.get_block_hash(height as u64).await.unwrap();
816        let block = rpc.get_block(&hash).await.unwrap();
817
818        // Save the tip.
819        let mut dbtx = db.begin_transaction().await.unwrap();
820        super::save_block(&db, &mut dbtx, &block, height)
821            .await
822            .unwrap();
823        dbtx.commit().await.unwrap();
824
825        let new_blocks =
826            super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
827                .await
828                .unwrap();
829        assert!(new_blocks.is_none());
830
831        // Mine new blocks without saving them.
832        let mine_count: u32 = config.protocol_paramset().finality_depth - 1;
833        let new_block_hashes = rpc.mine_blocks(mine_count as u64).await.unwrap();
834        let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
835
836        let new_blocks = super::fetch_new_blocks(
837            &db,
838            &rpc,
839            new_height - 1,
840            config.protocol_paramset().finality_depth,
841        )
842        .await
843        .unwrap()
844        .unwrap();
845        assert_eq!(new_blocks.len(), mine_count as usize);
846        for (index, block) in new_blocks.iter().enumerate() {
847            assert_eq!(block.height, new_height - mine_count + index as u32 + 1);
848            assert_eq!(block.hash, new_block_hashes[index]);
849        }
850
851        // Mine too many blocks.
852        let mine_count: u32 = config.protocol_paramset().finality_depth;
853        rpc.mine_blocks(mine_count as u64).await.unwrap();
854        let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
855
856        assert!(super::fetch_new_blocks(
857            &db,
858            &rpc,
859            new_height - 1,
860            config.protocol_paramset().finality_depth
861        )
862        .await
863        .is_err());
864    }
865    #[ignore]
866    #[tokio::test]
867    async fn set_non_canonical_block_hashes() {
868        let mut config = create_test_config_with_thread_name().await;
869        let db = Database::new(&config).await.unwrap();
870        let regtest = create_regtest_rpc(&mut config).await;
871        let rpc = regtest.rpc().clone();
872
873        let hashes = rpc.mine_blocks(4).await.unwrap();
874        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
875
876        super::set_initial_block_info_if_not_exists(&db, &rpc, config.protocol_paramset())
877            .await
878            .unwrap();
879
880        rpc.invalidate_block(hashes.get(3).unwrap()).await.unwrap();
881        rpc.invalidate_block(hashes.get(2).unwrap()).await.unwrap();
882
883        let mut dbtx = db.begin_transaction().await.unwrap();
884
885        let last_db_block =
886            super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, *hashes.get(3).unwrap())
887                .await
888                .unwrap();
889        assert_eq!(last_db_block.0.height, height);
890        assert_eq!(last_db_block.0.hash, *hashes.get(3).unwrap());
891
892        super::handle_reorg_events(&db, &mut dbtx, height - 2)
893            .await
894            .unwrap();
895
896        assert!(
897            super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, *hashes.get(3).unwrap())
898                .await
899                .is_err()
900        );
901
902        dbtx.commit().await.unwrap();
903    }
904
905    #[tokio::test]
906    async fn start_bitcoin_syncer_new_block_mined() {
907        let mut config = create_test_config_with_thread_name().await;
908        let db = Database::new(&config).await.unwrap();
909        let regtest = create_regtest_rpc(&mut config).await;
910        let rpc = regtest.rpc().clone();
911
912        rpc.mine_blocks(1).await.unwrap();
913        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
914        let hash = rpc.get_block_hash(height as u64).await.unwrap();
915
916        let (looping_task, _cancel_tx) =
917            BitcoinSyncer::new(db.clone(), rpc.clone(), config.protocol_paramset())
918                .await
919                .unwrap()
920                .into_task()
921                .cancelable_loop();
922
923        looping_task.into_bg();
924
925        loop {
926            let mut dbtx = db.begin_transaction().await.unwrap();
927
928            let last_db_block =
929                match super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash).await {
930                    Ok(block) => block,
931                    Err(_) => {
932                        dbtx.commit().await.unwrap();
933                        continue;
934                    }
935                };
936
937            assert_eq!(last_db_block.0.height, height);
938            assert_eq!(last_db_block.0.hash, hash);
939
940            dbtx.commit().await.unwrap();
941            break;
942        }
943    }
944}