clementine_core/
bitcoin_syncer.rs

1//! # Bitcoin Syncer
2//!
3//! This module provides common utilities to fetch Bitcoin state. Other modules
4//! can use this module to operate over Bitcoin. Every block starting from
5//! `paramset.start_height` is fetched and stored in the database.
6
7use crate::{
8    config::protocol::ProtocolParamset,
9    database::{Database, DatabaseTransaction},
10    errors::BridgeError,
11    extended_bitcoin_rpc::ExtendedBitcoinRpc,
12    task::{IntoTask, RecoverableTask, Task, TaskExt, TaskVariant, WithDelay},
13};
14use bitcoin::{block::Header, BlockHash, OutPoint};
15use bitcoincore_rpc::RpcApi;
16use eyre::Context;
17use std::time::Duration;
18use tonic::async_trait;
19
20pub const BTC_SYNCER_POLL_DELAY: Duration = if cfg!(test) {
21    Duration::from_millis(250)
22} else {
23    Duration::from_secs(30)
24};
25
26/// Represents basic information of a Bitcoin block.
27#[derive(Clone, Debug)]
28struct BlockInfo {
29    hash: BlockHash,
30    _header: Header,
31    height: u32,
32}
33
34/// Events emitted by the Bitcoin syncer.
35/// It emits the block_id of the block in the db that was saved.
36#[derive(Clone, Debug)]
37pub enum BitcoinSyncerEvent {
38    NewBlock(u32),
39    ReorgedBlock(u32),
40}
41
42impl TryFrom<(String, i32)> for BitcoinSyncerEvent {
43    type Error = eyre::Report;
44    fn try_from(value: (String, i32)) -> Result<Self, Self::Error> {
45        match value.0.as_str() {
46            "new_block" => Ok(BitcoinSyncerEvent::NewBlock(
47                u32::try_from(value.1).wrap_err(BridgeError::IntConversionError)?,
48            )),
49            "reorged_block" => Ok(BitcoinSyncerEvent::ReorgedBlock(
50                u32::try_from(value.1).wrap_err(BridgeError::IntConversionError)?,
51            )),
52            _ => Err(eyre::eyre!("Invalid event type: {}", value.0)),
53        }
54    }
55}
56
57/// Trait for handling new blocks as they are finalized
58#[async_trait]
59pub trait BlockHandler: Send + Sync + 'static {
60    /// Handle a new finalized block
61    async fn handle_new_block(
62        &mut self,
63        dbtx: DatabaseTransaction<'_, '_>,
64        block_id: u32,
65        block: bitcoin::Block,
66        height: u32,
67    ) -> Result<(), BridgeError>;
68}
69
70/// Fetches the [`BlockInfo`] for a given height from Bitcoin.
71async fn fetch_block_info_from_height(
72    rpc: &ExtendedBitcoinRpc,
73    height: u32,
74) -> Result<BlockInfo, BridgeError> {
75    let hash = rpc
76        .get_block_hash(height as u64)
77        .await
78        .wrap_err("Failed to get block hash")?;
79    let header = rpc
80        .get_block_header(&hash)
81        .await
82        .wrap_err("Failed to get block header")?;
83
84    Ok(BlockInfo {
85        hash,
86        _header: header,
87        height,
88    })
89}
90
91/// Saves a Bitcoin block's metadata and it's transactions into the database.
92pub(crate) async fn save_block(
93    db: &Database,
94    dbtx: DatabaseTransaction<'_, '_>,
95    block: &bitcoin::Block,
96    block_height: u32,
97) -> Result<u32, BridgeError> {
98    let block_hash = block.block_hash();
99    tracing::debug!(
100        "Saving a block with hash of {} and height of {}",
101        block_hash,
102        block_height
103    );
104
105    // update the block_info as canonical if it already exists
106    let block_id = db.update_block_as_canonical(Some(dbtx), block_hash).await?;
107    db.upsert_full_block(Some(dbtx), block, block_height)
108        .await?;
109
110    if let Some(block_id) = block_id {
111        return Ok(block_id);
112    }
113
114    let block_id = db
115        .insert_block_info(
116            Some(dbtx),
117            &block_hash,
118            &block.header.prev_blockhash,
119            block_height,
120        )
121        .await?;
122
123    tracing::debug!(
124        "Saving {} transactions to a block with hash {}",
125        block.txdata.len(),
126        block_hash
127    );
128    for tx in &block.txdata {
129        save_transaction_spent_utxos(db, dbtx, tx, block_id).await?;
130    }
131
132    Ok(block_id)
133}
134async fn _get_block_info_from_hash(
135    db: &Database,
136    dbtx: DatabaseTransaction<'_, '_>,
137    rpc: &ExtendedBitcoinRpc,
138    hash: BlockHash,
139) -> Result<(BlockInfo, Vec<Vec<OutPoint>>), BridgeError> {
140    let block = rpc.get_block(&hash).await.wrap_err("Failed to get block")?;
141    let block_height = db
142        .get_block_info_from_hash(Some(dbtx), hash)
143        .await?
144        .ok_or_else(|| eyre::eyre!("Block not found in get_block_info_from_hash"))?
145        .1;
146
147    let mut block_utxos: Vec<Vec<OutPoint>> = Vec::new();
148    for tx in &block.txdata {
149        let txid = tx.compute_txid();
150        let spent_utxos = _get_transaction_spent_utxos(db, dbtx, txid).await?;
151        block_utxos.push(spent_utxos);
152    }
153
154    let block_info = BlockInfo {
155        hash,
156        _header: block.header,
157        height: block_height,
158    };
159
160    Ok((block_info, block_utxos))
161}
162
163/// Saves a Bitcoin transaction and its spent UTXOs to the database.
164async fn save_transaction_spent_utxos(
165    db: &Database,
166    dbtx: DatabaseTransaction<'_, '_>,
167    tx: &bitcoin::Transaction,
168    block_id: u32,
169) -> Result<(), BridgeError> {
170    let txid = tx.compute_txid();
171    db.insert_txid_to_block(dbtx, block_id, &txid).await?;
172
173    for input in &tx.input {
174        db.insert_spent_utxo(
175            dbtx,
176            block_id,
177            &txid,
178            &input.previous_output.txid,
179            input.previous_output.vout as i64,
180        )
181        .await?;
182    }
183
184    Ok(())
185}
186async fn _get_transaction_spent_utxos(
187    db: &Database,
188    dbtx: DatabaseTransaction<'_, '_>,
189    txid: bitcoin::Txid,
190) -> Result<Vec<OutPoint>, BridgeError> {
191    let utxos = db.get_spent_utxos_for_txid(Some(dbtx), txid).await?;
192    let utxos = utxos.into_iter().map(|utxo| utxo.1).collect::<Vec<_>>();
193
194    Ok(utxos)
195}
196
197/// If no block info exists in the DB, fetches the current block from the RPC and initializes the DB.
198pub async fn set_initial_block_info_if_not_exists(
199    db: &Database,
200    rpc: &ExtendedBitcoinRpc,
201    paramset: &'static ProtocolParamset,
202) -> Result<(), BridgeError> {
203    if db.get_max_height(None).await?.is_some() {
204        return Ok(());
205    }
206
207    let current_height = u32::try_from(
208        rpc.get_block_count()
209            .await
210            .wrap_err("Failed to get block count")?,
211    )
212    .wrap_err(BridgeError::IntConversionError)?;
213
214    if paramset.start_height > current_height {
215        tracing::error!(
216            "Bitcoin syncer could not find enough available blocks in chain (Likely a regtest problem). start_height ({}) > current_height ({})",
217            paramset.start_height,
218            current_height
219        );
220        return Ok(());
221    }
222
223    let height = paramset.start_height;
224    let mut dbtx = db.begin_transaction().await?;
225    // first collect previous needed blocks according to paramset start height
226    let block_info = fetch_block_info_from_height(rpc, height).await?;
227    let block = rpc
228        .get_block(&block_info.hash)
229        .await
230        .wrap_err("Failed to get block")?;
231    let block_id = save_block(db, &mut dbtx, &block, height).await?;
232    db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
233        .await?;
234
235    dbtx.commit().await?;
236
237    Ok(())
238}
239
240/// Fetches the next block from Bitcoin, if it exists. Will also fetch previous
241/// blocks if the parent is missing, and give an error if the number of reorged blocks is greater than the finality depth.
242///
243/// # Parameters
244///
245/// - `current_height`: The height of the current tip **in the database**.
246///
247/// # Returns
248///
249/// - [`None`] - If no new block is available.
250/// - [`Vec<BlockInfo>`] - If new blocks are found.
251/// - [`BridgeError`] - If the number of reorged blocks is greater than the finality depth or db/rpc errors.
252async fn fetch_new_blocks(
253    db: &Database,
254    rpc: &ExtendedBitcoinRpc,
255    current_height: u32,
256    finality_depth: u32,
257) -> Result<Option<Vec<BlockInfo>>, BridgeError> {
258    let next_height = current_height + 1;
259
260    // Try to fetch the block hash for the next height.
261    let block_hash = match rpc.get_block_hash(next_height as u64).await {
262        Ok(hash) => hash,
263        Err(_) => return Ok(None),
264    };
265    tracing::debug!(
266        "Fetching block with hash of {:?} and height of {}...",
267        block_hash,
268        next_height
269    );
270
271    // Fetch its header.
272    let mut block_header = rpc
273        .get_block_header(&block_hash)
274        .await
275        .wrap_err("Failed to get block header")?;
276    let mut new_blocks = vec![BlockInfo {
277        hash: block_hash,
278        _header: block_header,
279        height: next_height,
280    }];
281
282    // Walk backwards until the parent is found in the database.
283    while db
284        .get_block_info_from_hash(None, block_header.prev_blockhash)
285        .await?
286        .is_none()
287    {
288        let prev_block_hash = block_header.prev_blockhash;
289        block_header = rpc
290            .get_block_header(&prev_block_hash)
291            .await
292            .wrap_err("Failed to get block header")?;
293        let new_height = new_blocks.last().expect("new_blocks is empty").height - 1;
294        new_blocks.push(BlockInfo {
295            hash: prev_block_hash,
296            _header: block_header,
297            height: new_height,
298        });
299
300        // new blocks includes the new one (block with height our previous tip + 1),
301        // so new_blocks.len() = 5 -> 4 reorged blocks,
302        if new_blocks.len() as u32 > finality_depth {
303            return Err(eyre::eyre!(
304                "Number of reorged blocks {} is greater than finality depth {}, reorged blocks: {:?}. If true, increase finality depth and resync the chain",
305                new_blocks.len() - 1,
306                finality_depth,
307                new_blocks
308            )
309            .into());
310        }
311    }
312
313    // The chain was built from tip to fork; reverse it to be in ascending order.
314    new_blocks.reverse();
315
316    Ok(Some(new_blocks))
317}
318
319/// Marks blocks above the common ancestor as non-canonical and emits reorg events.
320#[tracing::instrument(skip(db, dbtx), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
321async fn handle_reorg_events(
322    db: &Database,
323    dbtx: DatabaseTransaction<'_, '_>,
324    common_ancestor_height: u32,
325) -> Result<(), BridgeError> {
326    let reorg_blocks = db
327        .update_non_canonical_block_hashes(Some(dbtx), common_ancestor_height)
328        .await?;
329    if !reorg_blocks.is_empty() {
330        tracing::debug!("Reorg occurred! Block ids: {:?}", reorg_blocks);
331    }
332
333    for reorg_block_id in reorg_blocks {
334        db.insert_event(Some(dbtx), BitcoinSyncerEvent::ReorgedBlock(reorg_block_id))
335            .await?;
336    }
337
338    Ok(())
339}
340
341/// Processes and inserts new blocks into the database, emitting a new block event for each.
342async fn process_new_blocks(
343    db: &Database,
344    rpc: &ExtendedBitcoinRpc,
345    dbtx: DatabaseTransaction<'_, '_>,
346    new_blocks: &[BlockInfo],
347) -> Result<(), BridgeError> {
348    for block_info in new_blocks {
349        let block = rpc
350            .get_block(&block_info.hash)
351            .await
352            .wrap_err("Failed to get block")?;
353
354        let block_id = save_block(db, dbtx, &block, block_info.height).await?;
355        db.insert_event(Some(dbtx), BitcoinSyncerEvent::NewBlock(block_id))
356            .await?;
357    }
358
359    Ok(())
360}
361
362/// A task that syncs Bitcoin blocks from the Bitcoin node to the local database.
363#[derive(Debug)]
364pub struct BitcoinSyncerTask {
365    /// The database to store blocks in
366    db: Database,
367    /// The RPC client to fetch blocks from
368    rpc: ExtendedBitcoinRpc,
369    /// The current block height that has been synced
370    current_height: u32,
371    /// The finality depth
372    finality_depth: u32,
373}
374
375#[derive(Debug)]
376pub struct BitcoinSyncer {
377    /// The database to store blocks in
378    db: Database,
379    /// The RPC client to fetch blocks from
380    rpc: ExtendedBitcoinRpc,
381    /// The current block height that has been synced
382    current_height: u32,
383    /// The finality depth
384    finality_depth: u32,
385}
386
387impl BitcoinSyncer {
388    /// Creates a new Bitcoin syncer task.
389    ///
390    /// This function initializes the database with the first block if it's empty.
391    pub async fn new(
392        db: Database,
393        rpc: ExtendedBitcoinRpc,
394        paramset: &'static ProtocolParamset,
395    ) -> Result<Self, BridgeError> {
396        // Initialize the database if needed
397        set_initial_block_info_if_not_exists(&db, &rpc, paramset).await?;
398
399        // Get the current height from the database
400        let current_height = db
401            .get_max_height(None)
402            .await?
403            .ok_or_else(|| eyre::eyre!("Block not found in BitcoinSyncer::new"))?;
404
405        Ok(Self {
406            db,
407            rpc,
408            current_height,
409            finality_depth: paramset.finality_depth,
410        })
411    }
412}
413impl IntoTask for BitcoinSyncer {
414    type Task = WithDelay<BitcoinSyncerTask>;
415
416    fn into_task(self) -> Self::Task {
417        BitcoinSyncerTask {
418            db: self.db,
419            rpc: self.rpc,
420            current_height: self.current_height,
421            finality_depth: self.finality_depth,
422        }
423        .with_delay(BTC_SYNCER_POLL_DELAY)
424    }
425}
426
427#[async_trait]
428impl Task for BitcoinSyncerTask {
429    type Output = bool;
430    const VARIANT: TaskVariant = TaskVariant::BitcoinSyncer;
431
432    #[tracing::instrument(skip(self))]
433    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
434        let new_blocks = match fetch_new_blocks(
435            &self.db,
436            &self.rpc,
437            self.current_height,
438            self.finality_depth,
439        )
440        .await?
441        {
442            Some(blocks) if !blocks.is_empty() => {
443                tracing::debug!(
444                    "{} new blocks found after current height {}",
445                    blocks.len(),
446                    self.current_height
447                );
448
449                blocks
450            }
451            _ => {
452                tracing::debug!(
453                    "No new blocks found after current height: {}",
454                    self.current_height
455                );
456
457                return Ok(false);
458            }
459        };
460
461        // The common ancestor is the block preceding the first new block.
462        // Please note that this won't always be the `self.current_height`.
463        // Because `fetch_next_block` can fetch older blocks, if db is missing
464        // them.
465        let common_ancestor_height = new_blocks[0].height - 1;
466        tracing::debug!("Common ancestor height: {:?}", common_ancestor_height);
467        let mut dbtx = self.db.begin_transaction().await?;
468
469        // Mark reorg blocks (if any) as non-canonical.
470        handle_reorg_events(&self.db, &mut dbtx, common_ancestor_height).await?;
471        tracing::debug!("BitcoinSyncer: Marked reorg blocks as non-canonical");
472
473        // Process and insert the new blocks.
474        tracing::debug!("BitcoinSyncer: Processing new blocks");
475        tracing::debug!("BitcoinSyncer: New blocks: {:?}", new_blocks.len());
476        process_new_blocks(&self.db, &self.rpc, &mut dbtx, &new_blocks).await?;
477
478        dbtx.commit().await?;
479
480        // Update the current height to the tip of the new chain.
481        tracing::debug!("BitcoinSyncer: Updating current height");
482        self.current_height = new_blocks.last().expect("new_blocks is not empty").height;
483        tracing::debug!("BitcoinSyncer: Current height: {:?}", self.current_height);
484
485        // Return true to indicate work was done
486        Ok(true)
487    }
488}
489
490#[derive(Debug)]
491pub struct FinalizedBlockFetcherTask<H: BlockHandler> {
492    db: Database,
493    btc_syncer_consumer_id: String,
494    paramset: &'static ProtocolParamset,
495    next_finalized_height: u32,
496    handler: H,
497}
498
499impl<H: BlockHandler> FinalizedBlockFetcherTask<H> {
500    pub fn new(
501        db: Database,
502        btc_syncer_consumer_id: String,
503        paramset: &'static ProtocolParamset,
504        next_finalized_height: u32,
505        handler: H,
506    ) -> Self {
507        Self {
508            db,
509            btc_syncer_consumer_id,
510            paramset,
511            next_finalized_height,
512            handler,
513        }
514    }
515}
516
517#[async_trait]
518impl<H: BlockHandler> Task for FinalizedBlockFetcherTask<H> {
519    type Output = bool;
520    const VARIANT: TaskVariant = TaskVariant::FinalizedBlockFetcher;
521
522    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
523        let mut dbtx = self.db.begin_transaction().await?;
524
525        // Poll for the next bitcoin syncer event
526        let Some(event) = self
527            .db
528            .fetch_next_bitcoin_syncer_evt(&mut dbtx, &self.btc_syncer_consumer_id)
529            .await?
530        else {
531            // No event found, we can safely commit the transaction and return
532            dbtx.commit().await?;
533            return Ok(false);
534        };
535        let mut expected_next_finalized = self.next_finalized_height;
536
537        // Process the event
538        let did_find_new_block = match event {
539            BitcoinSyncerEvent::NewBlock(block_id) => {
540                let new_block_height = self
541                    .db
542                    .get_block_info_from_id(Some(&mut dbtx), block_id)
543                    .await?
544                    .ok_or(eyre::eyre!("Block not found in BlockFetcherTask",))?
545                    .1;
546
547                // whether there's a new finalized block, can be false if reorged to the same height
548                let mut new_tip = false;
549                let mut warned = false;
550
551                // Update states to catch up to finalized chain
552                while self
553                    .paramset
554                    .is_block_finalized(expected_next_finalized, new_block_height)
555                {
556                    if new_tip && !warned {
557                        warned = true;
558                        // this event is multiple blocks away, report
559                        tracing::warn!("Received event with multiple finalized blocks, expected 1 for ordered events. Got a new block with height {new_block_height}, expected next finalized block {}", self.next_finalized_height);
560                    }
561                    new_tip = true;
562
563                    let block = self
564                        .db
565                        .get_full_block(Some(&mut dbtx), expected_next_finalized)
566                        .await?
567                        .ok_or(eyre::eyre!(
568                            "Block at height {} not found in BlockFetcherTask, current tip height is {}",
569                            expected_next_finalized, new_block_height
570                        ))?;
571
572                    let new_block_id = self
573                        .db
574                        .get_canonical_block_id_from_height(
575                            Some(&mut dbtx),
576                            expected_next_finalized,
577                        )
578                        .await?;
579
580                    let Some(new_block_id) = new_block_id else {
581                        tracing::error!("Block at height {} not found in BlockFetcherTask, current tip height is {}", expected_next_finalized, new_block_height);
582                        return Err(eyre::eyre!(
583                            "Block at height {} not found in BlockFetcherTask, current tip height is {}",
584                            expected_next_finalized, new_block_height
585                        ).into());
586                    };
587
588                    self.handler
589                        .handle_new_block(&mut dbtx, new_block_id, block, expected_next_finalized)
590                        .await?;
591
592                    expected_next_finalized += 1;
593                }
594
595                new_tip
596            }
597            BitcoinSyncerEvent::ReorgedBlock(_) => false,
598        };
599
600        dbtx.commit().await?;
601        // update next height only after db commit is successful so next_height is consistent with state in DB
602        self.next_finalized_height = expected_next_finalized;
603        // Return whether we found new blocks
604        Ok(did_find_new_block)
605    }
606}
607
608#[async_trait]
609impl<H: BlockHandler> RecoverableTask for FinalizedBlockFetcherTask<H> {
610    async fn recover_from_error(&mut self, _error: &BridgeError) -> Result<(), BridgeError> {
611        // No action needed. Errors will cause a rollback and the task will retry on the next run.
612        // In-memory data remains in sync (as it's only updated after db commit is successful).
613        Ok(())
614    }
615}
616
617#[cfg(test)]
618mod tests {
619    use crate::bitcoin_syncer::BitcoinSyncer;
620    use crate::builder::transaction::DEFAULT_SEQUENCE;
621    use crate::task::{IntoTask, TaskExt};
622    use crate::{database::Database, test::common::*};
623    use bitcoin::absolute::Height;
624    use bitcoin::hashes::Hash;
625    use bitcoin::transaction::Version;
626    use bitcoin::{OutPoint, ScriptBuf, Transaction, TxIn, Witness};
627    use bitcoincore_rpc::RpcApi;
628
629    #[tokio::test]
630    async fn get_block_info_from_height() {
631        let mut config = create_test_config_with_thread_name().await;
632        let regtest = create_regtest_rpc(&mut config).await;
633        let rpc = regtest.rpc().clone();
634
635        rpc.mine_blocks(1).await.unwrap();
636        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
637        let hash = rpc.get_block_hash(height as u64).await.unwrap();
638        let header = rpc.get_block_header(&hash).await.unwrap();
639
640        let block_info = super::fetch_block_info_from_height(&rpc, height)
641            .await
642            .unwrap();
643        assert_eq!(block_info._header, header);
644        assert_eq!(block_info.hash, hash);
645        assert_eq!(block_info.height, height);
646
647        rpc.mine_blocks(1).await.unwrap();
648        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
649
650        let block_info = super::fetch_block_info_from_height(&rpc, height)
651            .await
652            .unwrap();
653        assert_ne!(block_info._header, header);
654        assert_ne!(block_info.hash, hash);
655        assert_eq!(block_info.height, height);
656    }
657
658    #[tokio::test]
659    async fn save_get_transaction_spent_utxos() {
660        let mut config = create_test_config_with_thread_name().await;
661        let db = Database::new(&config).await.unwrap();
662        let regtest = create_regtest_rpc(&mut config).await;
663        let rpc = regtest.rpc().clone();
664
665        let mut dbtx = db.begin_transaction().await.unwrap();
666
667        rpc.mine_blocks(1).await.unwrap();
668        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
669        let hash = rpc.get_block_hash(height as u64).await.unwrap();
670        let block = rpc.get_block(&hash).await.unwrap();
671        let block_id = super::save_block(&db, &mut dbtx, &block, height)
672            .await
673            .unwrap();
674
675        let inputs = vec![
676            TxIn {
677                previous_output: OutPoint {
678                    txid: bitcoin::Txid::all_zeros(),
679                    vout: 0,
680                },
681                script_sig: ScriptBuf::default(),
682                sequence: DEFAULT_SEQUENCE,
683                witness: Witness::default(),
684            },
685            TxIn {
686                previous_output: OutPoint {
687                    txid: bitcoin::Txid::all_zeros(),
688                    vout: 1,
689                },
690                script_sig: ScriptBuf::default(),
691                sequence: DEFAULT_SEQUENCE,
692                witness: Witness::default(),
693            },
694        ];
695        let tx = Transaction {
696            version: Version::TWO,
697            lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
698            input: inputs.clone(),
699            output: vec![],
700        };
701        super::save_transaction_spent_utxos(&db, &mut dbtx, &tx, block_id)
702            .await
703            .unwrap();
704
705        let utxos = super::_get_transaction_spent_utxos(&db, &mut dbtx, tx.compute_txid())
706            .await
707            .unwrap();
708
709        for (index, input) in inputs.iter().enumerate() {
710            assert_eq!(input.previous_output, utxos[index]);
711        }
712
713        dbtx.commit().await.unwrap();
714    }
715
716    #[tokio::test]
717    async fn save_get_block() {
718        let mut config = create_test_config_with_thread_name().await;
719        let db = Database::new(&config).await.unwrap();
720        let regtest = create_regtest_rpc(&mut config).await;
721        let rpc = regtest.rpc().clone();
722
723        let mut dbtx = db.begin_transaction().await.unwrap();
724
725        rpc.mine_blocks(1).await.unwrap();
726        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
727        let hash = rpc.get_block_hash(height as u64).await.unwrap();
728        let block = rpc.get_block(&hash).await.unwrap();
729
730        super::save_block(&db, &mut dbtx, &block, height)
731            .await
732            .unwrap();
733
734        let (block_info, utxos) = super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
735            .await
736            .unwrap();
737        assert_eq!(block_info._header, block.header);
738        assert_eq!(block_info.hash, hash);
739        assert_eq!(block_info.height, height);
740        for (tx_index, tx) in block.txdata.iter().enumerate() {
741            for (txin_index, txin) in tx.input.iter().enumerate() {
742                assert_eq!(txin.previous_output, utxos[tx_index][txin_index]);
743            }
744        }
745
746        dbtx.commit().await.unwrap();
747    }
748
749    #[tokio::test]
750    async fn set_initial_block_info_if_not_exists() {
751        let mut config = create_test_config_with_thread_name().await;
752        let db = Database::new(&config).await.unwrap();
753        let regtest = create_regtest_rpc(&mut config).await;
754        let rpc = regtest.rpc().clone();
755
756        let mut dbtx = db.begin_transaction().await.unwrap();
757
758        rpc.mine_blocks(1).await.unwrap();
759        // let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
760        let hash = rpc
761            .get_block_hash(config.protocol_paramset().start_height as u64)
762            .await
763            .unwrap();
764        let block = rpc.get_block(&hash).await.unwrap();
765
766        assert!(super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
767            .await
768            .is_err());
769
770        super::set_initial_block_info_if_not_exists(&db, &rpc, config.protocol_paramset())
771            .await
772            .unwrap();
773
774        let (block_info, utxos) = super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
775            .await
776            .unwrap();
777        assert_eq!(block_info.hash, hash);
778        assert_eq!(block_info.height, config.protocol_paramset().start_height);
779
780        for (tx_index, tx) in block.txdata.iter().enumerate() {
781            for (txin_index, txin) in tx.input.iter().enumerate() {
782                assert_eq!(txin.previous_output, utxos[tx_index][txin_index]);
783            }
784        }
785    }
786
787    #[tokio::test]
788    async fn fetch_new_blocks_forward() {
789        let mut config = create_test_config_with_thread_name().await;
790        let db = Database::new(&config).await.unwrap();
791        let regtest = create_regtest_rpc(&mut config).await;
792        let rpc = regtest.rpc().clone();
793
794        let mut dbtx = db.begin_transaction().await.unwrap();
795
796        rpc.mine_blocks(1).await.unwrap();
797        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
798        let hash = rpc.get_block_hash(height as u64).await.unwrap();
799        let block = rpc.get_block(&hash).await.unwrap();
800        super::save_block(&db, &mut dbtx, &block, height)
801            .await
802            .unwrap();
803        dbtx.commit().await.unwrap();
804
805        let new_blocks =
806            super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
807                .await
808                .unwrap();
809        assert!(new_blocks.is_none());
810
811        let new_block_hashes = rpc.mine_blocks(1).await.unwrap();
812        let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
813        let new_blocks =
814            super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
815                .await
816                .unwrap()
817                .unwrap();
818        assert_eq!(new_blocks.len(), 1);
819        assert_eq!(new_blocks.first().unwrap().height, new_height);
820        assert_eq!(
821            new_blocks.first().unwrap().hash,
822            *new_block_hashes.first().unwrap()
823        );
824    }
825
826    #[tokio::test]
827    async fn fetch_new_blocks_backwards() {
828        let mut config = create_test_config_with_thread_name().await;
829        let db = Database::new(&config).await.unwrap();
830        let regtest = create_regtest_rpc(&mut config).await;
831        let rpc = regtest.rpc().clone();
832
833        // Prepare chain.
834        rpc.mine_blocks(1).await.unwrap();
835        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
836        let hash = rpc.get_block_hash(height as u64).await.unwrap();
837        let block = rpc.get_block(&hash).await.unwrap();
838
839        // Save the tip.
840        let mut dbtx = db.begin_transaction().await.unwrap();
841        super::save_block(&db, &mut dbtx, &block, height)
842            .await
843            .unwrap();
844        dbtx.commit().await.unwrap();
845
846        let new_blocks =
847            super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
848                .await
849                .unwrap();
850        assert!(new_blocks.is_none());
851
852        // Mine new blocks without saving them.
853        let mine_count: u32 = config.protocol_paramset().finality_depth - 1;
854        let new_block_hashes = rpc.mine_blocks(mine_count as u64).await.unwrap();
855        let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
856
857        let new_blocks = super::fetch_new_blocks(
858            &db,
859            &rpc,
860            new_height - 1,
861            config.protocol_paramset().finality_depth,
862        )
863        .await
864        .unwrap()
865        .unwrap();
866        assert_eq!(new_blocks.len(), mine_count as usize);
867        for (index, block) in new_blocks.iter().enumerate() {
868            assert_eq!(block.height, new_height - mine_count + index as u32 + 1);
869            assert_eq!(block.hash, new_block_hashes[index]);
870        }
871
872        // Mine too many blocks.
873        let mine_count: u32 = config.protocol_paramset().finality_depth;
874        rpc.mine_blocks(mine_count as u64).await.unwrap();
875        let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
876
877        assert!(super::fetch_new_blocks(
878            &db,
879            &rpc,
880            new_height - 1,
881            config.protocol_paramset().finality_depth
882        )
883        .await
884        .is_err());
885    }
886    #[ignore]
887    #[tokio::test]
888    async fn set_non_canonical_block_hashes() {
889        let mut config = create_test_config_with_thread_name().await;
890        let db = Database::new(&config).await.unwrap();
891        let regtest = create_regtest_rpc(&mut config).await;
892        let rpc = regtest.rpc().clone();
893
894        let hashes = rpc.mine_blocks(4).await.unwrap();
895        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
896
897        super::set_initial_block_info_if_not_exists(&db, &rpc, config.protocol_paramset())
898            .await
899            .unwrap();
900
901        rpc.invalidate_block(hashes.get(3).unwrap()).await.unwrap();
902        rpc.invalidate_block(hashes.get(2).unwrap()).await.unwrap();
903
904        let mut dbtx = db.begin_transaction().await.unwrap();
905
906        let last_db_block =
907            super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, *hashes.get(3).unwrap())
908                .await
909                .unwrap();
910        assert_eq!(last_db_block.0.height, height);
911        assert_eq!(last_db_block.0.hash, *hashes.get(3).unwrap());
912
913        super::handle_reorg_events(&db, &mut dbtx, height - 2)
914            .await
915            .unwrap();
916
917        assert!(
918            super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, *hashes.get(3).unwrap())
919                .await
920                .is_err()
921        );
922
923        dbtx.commit().await.unwrap();
924    }
925
926    #[tokio::test]
927    async fn start_bitcoin_syncer_new_block_mined() {
928        let mut config = create_test_config_with_thread_name().await;
929        let db = Database::new(&config).await.unwrap();
930        let regtest = create_regtest_rpc(&mut config).await;
931        let rpc = regtest.rpc().clone();
932
933        rpc.mine_blocks(1).await.unwrap();
934        let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
935        let hash = rpc.get_block_hash(height as u64).await.unwrap();
936
937        let (looping_task, _cancel_tx) =
938            BitcoinSyncer::new(db.clone(), rpc.clone(), config.protocol_paramset())
939                .await
940                .unwrap()
941                .into_task()
942                .cancelable_loop();
943
944        looping_task.into_bg();
945
946        loop {
947            let mut dbtx = db.begin_transaction().await.unwrap();
948
949            let last_db_block =
950                match super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash).await {
951                    Ok(block) => block,
952                    Err(_) => {
953                        dbtx.commit().await.unwrap();
954                        continue;
955                    }
956                };
957
958            assert_eq!(last_db_block.0.height, height);
959            assert_eq!(last_db_block.0.hash, hash);
960
961            dbtx.commit().await.unwrap();
962            break;
963        }
964    }
965}