1use crate::{
8 config::protocol::ProtocolParamset,
9 database::{Database, DatabaseTransaction},
10 errors::BridgeError,
11 extended_bitcoin_rpc::ExtendedBitcoinRpc,
12 task::{IntoTask, RecoverableTask, Task, TaskExt, TaskVariant, WithDelay},
13};
14use bitcoin::{block::Header, BlockHash, OutPoint};
15use bitcoincore_rpc::RpcApi;
16use eyre::Context;
17use std::time::Duration;
18use tonic::async_trait;
19
20pub const BTC_SYNCER_POLL_DELAY: Duration = if cfg!(test) {
21 Duration::from_millis(250)
22} else {
23 Duration::from_secs(30)
24};
25
26#[derive(Clone, Debug)]
28struct BlockInfo {
29 hash: BlockHash,
30 _header: Header,
31 height: u32,
32}
33
34#[derive(Clone, Debug)]
37pub enum BitcoinSyncerEvent {
38 NewBlock(u32),
39 ReorgedBlock(u32),
40}
41
42impl TryFrom<(String, i32)> for BitcoinSyncerEvent {
43 type Error = eyre::Report;
44 fn try_from(value: (String, i32)) -> Result<Self, Self::Error> {
45 match value.0.as_str() {
46 "new_block" => Ok(BitcoinSyncerEvent::NewBlock(
47 u32::try_from(value.1).wrap_err(BridgeError::IntConversionError)?,
48 )),
49 "reorged_block" => Ok(BitcoinSyncerEvent::ReorgedBlock(
50 u32::try_from(value.1).wrap_err(BridgeError::IntConversionError)?,
51 )),
52 _ => Err(eyre::eyre!("Invalid event type: {}", value.0)),
53 }
54 }
55}
56
57#[async_trait]
59pub trait BlockHandler: Send + Sync + 'static {
60 async fn handle_new_block(
62 &mut self,
63 dbtx: DatabaseTransaction<'_, '_>,
64 block_id: u32,
65 block: bitcoin::Block,
66 height: u32,
67 ) -> Result<(), BridgeError>;
68}
69
70async fn fetch_block_info_from_height(
72 rpc: &ExtendedBitcoinRpc,
73 height: u32,
74) -> Result<BlockInfo, BridgeError> {
75 let hash = rpc
76 .get_block_hash(height as u64)
77 .await
78 .wrap_err("Failed to get block hash")?;
79 let header = rpc
80 .get_block_header(&hash)
81 .await
82 .wrap_err("Failed to get block header")?;
83
84 Ok(BlockInfo {
85 hash,
86 _header: header,
87 height,
88 })
89}
90
91pub(crate) async fn save_block(
93 db: &Database,
94 dbtx: DatabaseTransaction<'_, '_>,
95 block: &bitcoin::Block,
96 block_height: u32,
97) -> Result<u32, BridgeError> {
98 let block_hash = block.block_hash();
99 tracing::debug!(
100 "Saving a block with hash of {} and height of {}",
101 block_hash,
102 block_height
103 );
104
105 let block_id = db.update_block_as_canonical(Some(dbtx), block_hash).await?;
107 db.upsert_full_block(Some(dbtx), block, block_height)
108 .await?;
109
110 if let Some(block_id) = block_id {
111 return Ok(block_id);
112 }
113
114 let block_id = db
115 .insert_block_info(
116 Some(dbtx),
117 &block_hash,
118 &block.header.prev_blockhash,
119 block_height,
120 )
121 .await?;
122
123 tracing::debug!(
124 "Saving {} transactions to a block with hash {}",
125 block.txdata.len(),
126 block_hash
127 );
128 for tx in &block.txdata {
129 save_transaction_spent_utxos(db, dbtx, tx, block_id).await?;
130 }
131
132 Ok(block_id)
133}
134async fn _get_block_info_from_hash(
135 db: &Database,
136 dbtx: DatabaseTransaction<'_, '_>,
137 rpc: &ExtendedBitcoinRpc,
138 hash: BlockHash,
139) -> Result<(BlockInfo, Vec<Vec<OutPoint>>), BridgeError> {
140 let block = rpc.get_block(&hash).await.wrap_err("Failed to get block")?;
141 let block_height = db
142 .get_block_info_from_hash(Some(dbtx), hash)
143 .await?
144 .ok_or_else(|| eyre::eyre!("Block not found in get_block_info_from_hash"))?
145 .1;
146
147 let mut block_utxos: Vec<Vec<OutPoint>> = Vec::new();
148 for tx in &block.txdata {
149 let txid = tx.compute_txid();
150 let spent_utxos = _get_transaction_spent_utxos(db, dbtx, txid).await?;
151 block_utxos.push(spent_utxos);
152 }
153
154 let block_info = BlockInfo {
155 hash,
156 _header: block.header,
157 height: block_height,
158 };
159
160 Ok((block_info, block_utxos))
161}
162
163async fn save_transaction_spent_utxos(
165 db: &Database,
166 dbtx: DatabaseTransaction<'_, '_>,
167 tx: &bitcoin::Transaction,
168 block_id: u32,
169) -> Result<(), BridgeError> {
170 let txid = tx.compute_txid();
171 db.insert_txid_to_block(dbtx, block_id, &txid).await?;
172
173 for input in &tx.input {
174 db.insert_spent_utxo(
175 dbtx,
176 block_id,
177 &txid,
178 &input.previous_output.txid,
179 input.previous_output.vout as i64,
180 )
181 .await?;
182 }
183
184 Ok(())
185}
186async fn _get_transaction_spent_utxos(
187 db: &Database,
188 dbtx: DatabaseTransaction<'_, '_>,
189 txid: bitcoin::Txid,
190) -> Result<Vec<OutPoint>, BridgeError> {
191 let utxos = db.get_spent_utxos_for_txid(Some(dbtx), txid).await?;
192 let utxos = utxos.into_iter().map(|utxo| utxo.1).collect::<Vec<_>>();
193
194 Ok(utxos)
195}
196
197pub async fn set_initial_block_info_if_not_exists(
199 db: &Database,
200 rpc: &ExtendedBitcoinRpc,
201 paramset: &'static ProtocolParamset,
202) -> Result<(), BridgeError> {
203 if db.get_max_height(None).await?.is_some() {
204 return Ok(());
205 }
206
207 let current_height = u32::try_from(
208 rpc.get_block_count()
209 .await
210 .wrap_err("Failed to get block count")?,
211 )
212 .wrap_err(BridgeError::IntConversionError)?;
213
214 if paramset.start_height > current_height {
215 tracing::error!(
216 "Bitcoin syncer could not find enough available blocks in chain (Likely a regtest problem). start_height ({}) > current_height ({})",
217 paramset.start_height,
218 current_height
219 );
220 return Ok(());
221 }
222
223 let height = paramset.start_height;
224 let mut dbtx = db.begin_transaction().await?;
225 let block_info = fetch_block_info_from_height(rpc, height).await?;
227 let block = rpc
228 .get_block(&block_info.hash)
229 .await
230 .wrap_err("Failed to get block")?;
231 let block_id = save_block(db, &mut dbtx, &block, height).await?;
232 db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
233 .await?;
234
235 dbtx.commit().await?;
236
237 Ok(())
238}
239
240async fn fetch_new_blocks(
253 db: &Database,
254 rpc: &ExtendedBitcoinRpc,
255 current_height: u32,
256 finality_depth: u32,
257) -> Result<Option<Vec<BlockInfo>>, BridgeError> {
258 let next_height = current_height + 1;
259
260 let block_hash = match rpc.get_block_hash(next_height as u64).await {
262 Ok(hash) => hash,
263 Err(_) => return Ok(None),
264 };
265 tracing::debug!(
266 "Fetching block with hash of {:?} and height of {}...",
267 block_hash,
268 next_height
269 );
270
271 let mut block_header = rpc
273 .get_block_header(&block_hash)
274 .await
275 .wrap_err("Failed to get block header")?;
276 let mut new_blocks = vec![BlockInfo {
277 hash: block_hash,
278 _header: block_header,
279 height: next_height,
280 }];
281
282 while db
284 .get_block_info_from_hash(None, block_header.prev_blockhash)
285 .await?
286 .is_none()
287 {
288 let prev_block_hash = block_header.prev_blockhash;
289 block_header = rpc
290 .get_block_header(&prev_block_hash)
291 .await
292 .wrap_err("Failed to get block header")?;
293 let new_height = new_blocks.last().expect("new_blocks is empty").height - 1;
294 new_blocks.push(BlockInfo {
295 hash: prev_block_hash,
296 _header: block_header,
297 height: new_height,
298 });
299
300 if new_blocks.len() as u32 > finality_depth {
303 return Err(eyre::eyre!(
304 "Number of reorged blocks {} is greater than finality depth {}, reorged blocks: {:?}. If true, increase finality depth and resync the chain",
305 new_blocks.len() - 1,
306 finality_depth,
307 new_blocks
308 )
309 .into());
310 }
311 }
312
313 new_blocks.reverse();
315
316 Ok(Some(new_blocks))
317}
318
319#[tracing::instrument(skip(db, dbtx), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
321async fn handle_reorg_events(
322 db: &Database,
323 dbtx: DatabaseTransaction<'_, '_>,
324 common_ancestor_height: u32,
325) -> Result<(), BridgeError> {
326 let reorg_blocks = db
327 .update_non_canonical_block_hashes(Some(dbtx), common_ancestor_height)
328 .await?;
329 if !reorg_blocks.is_empty() {
330 tracing::debug!("Reorg occurred! Block ids: {:?}", reorg_blocks);
331 }
332
333 for reorg_block_id in reorg_blocks {
334 db.insert_event(Some(dbtx), BitcoinSyncerEvent::ReorgedBlock(reorg_block_id))
335 .await?;
336 }
337
338 Ok(())
339}
340
341async fn process_new_blocks(
343 db: &Database,
344 rpc: &ExtendedBitcoinRpc,
345 dbtx: DatabaseTransaction<'_, '_>,
346 new_blocks: &[BlockInfo],
347) -> Result<(), BridgeError> {
348 for block_info in new_blocks {
349 let block = rpc
350 .get_block(&block_info.hash)
351 .await
352 .wrap_err("Failed to get block")?;
353
354 let block_id = save_block(db, dbtx, &block, block_info.height).await?;
355 db.insert_event(Some(dbtx), BitcoinSyncerEvent::NewBlock(block_id))
356 .await?;
357 }
358
359 Ok(())
360}
361
362#[derive(Debug)]
364pub struct BitcoinSyncerTask {
365 db: Database,
367 rpc: ExtendedBitcoinRpc,
369 current_height: u32,
371 finality_depth: u32,
373}
374
375#[derive(Debug)]
376pub struct BitcoinSyncer {
377 db: Database,
379 rpc: ExtendedBitcoinRpc,
381 current_height: u32,
383 finality_depth: u32,
385}
386
387impl BitcoinSyncer {
388 pub async fn new(
392 db: Database,
393 rpc: ExtendedBitcoinRpc,
394 paramset: &'static ProtocolParamset,
395 ) -> Result<Self, BridgeError> {
396 set_initial_block_info_if_not_exists(&db, &rpc, paramset).await?;
398
399 let current_height = db
401 .get_max_height(None)
402 .await?
403 .ok_or_else(|| eyre::eyre!("Block not found in BitcoinSyncer::new"))?;
404
405 Ok(Self {
406 db,
407 rpc,
408 current_height,
409 finality_depth: paramset.finality_depth,
410 })
411 }
412}
413impl IntoTask for BitcoinSyncer {
414 type Task = WithDelay<BitcoinSyncerTask>;
415
416 fn into_task(self) -> Self::Task {
417 BitcoinSyncerTask {
418 db: self.db,
419 rpc: self.rpc,
420 current_height: self.current_height,
421 finality_depth: self.finality_depth,
422 }
423 .with_delay(BTC_SYNCER_POLL_DELAY)
424 }
425}
426
427#[async_trait]
428impl Task for BitcoinSyncerTask {
429 type Output = bool;
430 const VARIANT: TaskVariant = TaskVariant::BitcoinSyncer;
431
432 #[tracing::instrument(skip(self))]
433 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
434 let new_blocks = match fetch_new_blocks(
435 &self.db,
436 &self.rpc,
437 self.current_height,
438 self.finality_depth,
439 )
440 .await?
441 {
442 Some(blocks) if !blocks.is_empty() => {
443 tracing::debug!(
444 "{} new blocks found after current height {}",
445 blocks.len(),
446 self.current_height
447 );
448
449 blocks
450 }
451 _ => {
452 tracing::debug!(
453 "No new blocks found after current height: {}",
454 self.current_height
455 );
456
457 return Ok(false);
458 }
459 };
460
461 let common_ancestor_height = new_blocks[0].height - 1;
466 tracing::debug!("Common ancestor height: {:?}", common_ancestor_height);
467 let mut dbtx = self.db.begin_transaction().await?;
468
469 handle_reorg_events(&self.db, &mut dbtx, common_ancestor_height).await?;
471 tracing::debug!("BitcoinSyncer: Marked reorg blocks as non-canonical");
472
473 tracing::debug!("BitcoinSyncer: Processing new blocks");
475 tracing::debug!("BitcoinSyncer: New blocks: {:?}", new_blocks.len());
476 process_new_blocks(&self.db, &self.rpc, &mut dbtx, &new_blocks).await?;
477
478 dbtx.commit().await?;
479
480 tracing::debug!("BitcoinSyncer: Updating current height");
482 self.current_height = new_blocks.last().expect("new_blocks is not empty").height;
483 tracing::debug!("BitcoinSyncer: Current height: {:?}", self.current_height);
484
485 Ok(true)
487 }
488}
489
490#[derive(Debug)]
491pub struct FinalizedBlockFetcherTask<H: BlockHandler> {
492 db: Database,
493 btc_syncer_consumer_id: String,
494 paramset: &'static ProtocolParamset,
495 next_finalized_height: u32,
496 handler: H,
497}
498
499impl<H: BlockHandler> FinalizedBlockFetcherTask<H> {
500 pub fn new(
501 db: Database,
502 btc_syncer_consumer_id: String,
503 paramset: &'static ProtocolParamset,
504 next_finalized_height: u32,
505 handler: H,
506 ) -> Self {
507 Self {
508 db,
509 btc_syncer_consumer_id,
510 paramset,
511 next_finalized_height,
512 handler,
513 }
514 }
515}
516
517#[async_trait]
518impl<H: BlockHandler> Task for FinalizedBlockFetcherTask<H> {
519 type Output = bool;
520 const VARIANT: TaskVariant = TaskVariant::FinalizedBlockFetcher;
521
522 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
523 let mut dbtx = self.db.begin_transaction().await?;
524
525 let Some(event) = self
527 .db
528 .fetch_next_bitcoin_syncer_evt(&mut dbtx, &self.btc_syncer_consumer_id)
529 .await?
530 else {
531 dbtx.commit().await?;
533 return Ok(false);
534 };
535 let mut expected_next_finalized = self.next_finalized_height;
536
537 let did_find_new_block = match event {
539 BitcoinSyncerEvent::NewBlock(block_id) => {
540 let new_block_height = self
541 .db
542 .get_block_info_from_id(Some(&mut dbtx), block_id)
543 .await?
544 .ok_or(eyre::eyre!("Block not found in BlockFetcherTask",))?
545 .1;
546
547 let mut new_tip = false;
549 let mut warned = false;
550
551 while self
553 .paramset
554 .is_block_finalized(expected_next_finalized, new_block_height)
555 {
556 if new_tip && !warned {
557 warned = true;
558 tracing::warn!("Received event with multiple finalized blocks, expected 1 for ordered events. Got a new block with height {new_block_height}, expected next finalized block {}", self.next_finalized_height);
560 }
561 new_tip = true;
562
563 let block = self
564 .db
565 .get_full_block(Some(&mut dbtx), expected_next_finalized)
566 .await?
567 .ok_or(eyre::eyre!(
568 "Block at height {} not found in BlockFetcherTask, current tip height is {}",
569 expected_next_finalized, new_block_height
570 ))?;
571
572 let new_block_id = self
573 .db
574 .get_canonical_block_id_from_height(
575 Some(&mut dbtx),
576 expected_next_finalized,
577 )
578 .await?;
579
580 let Some(new_block_id) = new_block_id else {
581 tracing::error!("Block at height {} not found in BlockFetcherTask, current tip height is {}", expected_next_finalized, new_block_height);
582 return Err(eyre::eyre!(
583 "Block at height {} not found in BlockFetcherTask, current tip height is {}",
584 expected_next_finalized, new_block_height
585 ).into());
586 };
587
588 self.handler
589 .handle_new_block(&mut dbtx, new_block_id, block, expected_next_finalized)
590 .await?;
591
592 expected_next_finalized += 1;
593 }
594
595 new_tip
596 }
597 BitcoinSyncerEvent::ReorgedBlock(_) => false,
598 };
599
600 dbtx.commit().await?;
601 self.next_finalized_height = expected_next_finalized;
603 Ok(did_find_new_block)
605 }
606}
607
608#[async_trait]
609impl<H: BlockHandler> RecoverableTask for FinalizedBlockFetcherTask<H> {
610 async fn recover_from_error(&mut self, _error: &BridgeError) -> Result<(), BridgeError> {
611 Ok(())
614 }
615}
616
617#[cfg(test)]
618mod tests {
619 use crate::bitcoin_syncer::BitcoinSyncer;
620 use crate::builder::transaction::DEFAULT_SEQUENCE;
621 use crate::task::{IntoTask, TaskExt};
622 use crate::{database::Database, test::common::*};
623 use bitcoin::absolute::Height;
624 use bitcoin::hashes::Hash;
625 use bitcoin::transaction::Version;
626 use bitcoin::{OutPoint, ScriptBuf, Transaction, TxIn, Witness};
627 use bitcoincore_rpc::RpcApi;
628
629 #[tokio::test]
630 async fn get_block_info_from_height() {
631 let mut config = create_test_config_with_thread_name().await;
632 let regtest = create_regtest_rpc(&mut config).await;
633 let rpc = regtest.rpc().clone();
634
635 rpc.mine_blocks(1).await.unwrap();
636 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
637 let hash = rpc.get_block_hash(height as u64).await.unwrap();
638 let header = rpc.get_block_header(&hash).await.unwrap();
639
640 let block_info = super::fetch_block_info_from_height(&rpc, height)
641 .await
642 .unwrap();
643 assert_eq!(block_info._header, header);
644 assert_eq!(block_info.hash, hash);
645 assert_eq!(block_info.height, height);
646
647 rpc.mine_blocks(1).await.unwrap();
648 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
649
650 let block_info = super::fetch_block_info_from_height(&rpc, height)
651 .await
652 .unwrap();
653 assert_ne!(block_info._header, header);
654 assert_ne!(block_info.hash, hash);
655 assert_eq!(block_info.height, height);
656 }
657
658 #[tokio::test]
659 async fn save_get_transaction_spent_utxos() {
660 let mut config = create_test_config_with_thread_name().await;
661 let db = Database::new(&config).await.unwrap();
662 let regtest = create_regtest_rpc(&mut config).await;
663 let rpc = regtest.rpc().clone();
664
665 let mut dbtx = db.begin_transaction().await.unwrap();
666
667 rpc.mine_blocks(1).await.unwrap();
668 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
669 let hash = rpc.get_block_hash(height as u64).await.unwrap();
670 let block = rpc.get_block(&hash).await.unwrap();
671 let block_id = super::save_block(&db, &mut dbtx, &block, height)
672 .await
673 .unwrap();
674
675 let inputs = vec![
676 TxIn {
677 previous_output: OutPoint {
678 txid: bitcoin::Txid::all_zeros(),
679 vout: 0,
680 },
681 script_sig: ScriptBuf::default(),
682 sequence: DEFAULT_SEQUENCE,
683 witness: Witness::default(),
684 },
685 TxIn {
686 previous_output: OutPoint {
687 txid: bitcoin::Txid::all_zeros(),
688 vout: 1,
689 },
690 script_sig: ScriptBuf::default(),
691 sequence: DEFAULT_SEQUENCE,
692 witness: Witness::default(),
693 },
694 ];
695 let tx = Transaction {
696 version: Version::TWO,
697 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
698 input: inputs.clone(),
699 output: vec![],
700 };
701 super::save_transaction_spent_utxos(&db, &mut dbtx, &tx, block_id)
702 .await
703 .unwrap();
704
705 let utxos = super::_get_transaction_spent_utxos(&db, &mut dbtx, tx.compute_txid())
706 .await
707 .unwrap();
708
709 for (index, input) in inputs.iter().enumerate() {
710 assert_eq!(input.previous_output, utxos[index]);
711 }
712
713 dbtx.commit().await.unwrap();
714 }
715
716 #[tokio::test]
717 async fn save_get_block() {
718 let mut config = create_test_config_with_thread_name().await;
719 let db = Database::new(&config).await.unwrap();
720 let regtest = create_regtest_rpc(&mut config).await;
721 let rpc = regtest.rpc().clone();
722
723 let mut dbtx = db.begin_transaction().await.unwrap();
724
725 rpc.mine_blocks(1).await.unwrap();
726 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
727 let hash = rpc.get_block_hash(height as u64).await.unwrap();
728 let block = rpc.get_block(&hash).await.unwrap();
729
730 super::save_block(&db, &mut dbtx, &block, height)
731 .await
732 .unwrap();
733
734 let (block_info, utxos) = super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
735 .await
736 .unwrap();
737 assert_eq!(block_info._header, block.header);
738 assert_eq!(block_info.hash, hash);
739 assert_eq!(block_info.height, height);
740 for (tx_index, tx) in block.txdata.iter().enumerate() {
741 for (txin_index, txin) in tx.input.iter().enumerate() {
742 assert_eq!(txin.previous_output, utxos[tx_index][txin_index]);
743 }
744 }
745
746 dbtx.commit().await.unwrap();
747 }
748
749 #[tokio::test]
750 async fn set_initial_block_info_if_not_exists() {
751 let mut config = create_test_config_with_thread_name().await;
752 let db = Database::new(&config).await.unwrap();
753 let regtest = create_regtest_rpc(&mut config).await;
754 let rpc = regtest.rpc().clone();
755
756 let mut dbtx = db.begin_transaction().await.unwrap();
757
758 rpc.mine_blocks(1).await.unwrap();
759 let hash = rpc
761 .get_block_hash(config.protocol_paramset().start_height as u64)
762 .await
763 .unwrap();
764 let block = rpc.get_block(&hash).await.unwrap();
765
766 assert!(super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
767 .await
768 .is_err());
769
770 super::set_initial_block_info_if_not_exists(&db, &rpc, config.protocol_paramset())
771 .await
772 .unwrap();
773
774 let (block_info, utxos) = super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
775 .await
776 .unwrap();
777 assert_eq!(block_info.hash, hash);
778 assert_eq!(block_info.height, config.protocol_paramset().start_height);
779
780 for (tx_index, tx) in block.txdata.iter().enumerate() {
781 for (txin_index, txin) in tx.input.iter().enumerate() {
782 assert_eq!(txin.previous_output, utxos[tx_index][txin_index]);
783 }
784 }
785 }
786
787 #[tokio::test]
788 async fn fetch_new_blocks_forward() {
789 let mut config = create_test_config_with_thread_name().await;
790 let db = Database::new(&config).await.unwrap();
791 let regtest = create_regtest_rpc(&mut config).await;
792 let rpc = regtest.rpc().clone();
793
794 let mut dbtx = db.begin_transaction().await.unwrap();
795
796 rpc.mine_blocks(1).await.unwrap();
797 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
798 let hash = rpc.get_block_hash(height as u64).await.unwrap();
799 let block = rpc.get_block(&hash).await.unwrap();
800 super::save_block(&db, &mut dbtx, &block, height)
801 .await
802 .unwrap();
803 dbtx.commit().await.unwrap();
804
805 let new_blocks =
806 super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
807 .await
808 .unwrap();
809 assert!(new_blocks.is_none());
810
811 let new_block_hashes = rpc.mine_blocks(1).await.unwrap();
812 let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
813 let new_blocks =
814 super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
815 .await
816 .unwrap()
817 .unwrap();
818 assert_eq!(new_blocks.len(), 1);
819 assert_eq!(new_blocks.first().unwrap().height, new_height);
820 assert_eq!(
821 new_blocks.first().unwrap().hash,
822 *new_block_hashes.first().unwrap()
823 );
824 }
825
826 #[tokio::test]
827 async fn fetch_new_blocks_backwards() {
828 let mut config = create_test_config_with_thread_name().await;
829 let db = Database::new(&config).await.unwrap();
830 let regtest = create_regtest_rpc(&mut config).await;
831 let rpc = regtest.rpc().clone();
832
833 rpc.mine_blocks(1).await.unwrap();
835 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
836 let hash = rpc.get_block_hash(height as u64).await.unwrap();
837 let block = rpc.get_block(&hash).await.unwrap();
838
839 let mut dbtx = db.begin_transaction().await.unwrap();
841 super::save_block(&db, &mut dbtx, &block, height)
842 .await
843 .unwrap();
844 dbtx.commit().await.unwrap();
845
846 let new_blocks =
847 super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
848 .await
849 .unwrap();
850 assert!(new_blocks.is_none());
851
852 let mine_count: u32 = config.protocol_paramset().finality_depth - 1;
854 let new_block_hashes = rpc.mine_blocks(mine_count as u64).await.unwrap();
855 let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
856
857 let new_blocks = super::fetch_new_blocks(
858 &db,
859 &rpc,
860 new_height - 1,
861 config.protocol_paramset().finality_depth,
862 )
863 .await
864 .unwrap()
865 .unwrap();
866 assert_eq!(new_blocks.len(), mine_count as usize);
867 for (index, block) in new_blocks.iter().enumerate() {
868 assert_eq!(block.height, new_height - mine_count + index as u32 + 1);
869 assert_eq!(block.hash, new_block_hashes[index]);
870 }
871
872 let mine_count: u32 = config.protocol_paramset().finality_depth;
874 rpc.mine_blocks(mine_count as u64).await.unwrap();
875 let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
876
877 assert!(super::fetch_new_blocks(
878 &db,
879 &rpc,
880 new_height - 1,
881 config.protocol_paramset().finality_depth
882 )
883 .await
884 .is_err());
885 }
886 #[ignore]
887 #[tokio::test]
888 async fn set_non_canonical_block_hashes() {
889 let mut config = create_test_config_with_thread_name().await;
890 let db = Database::new(&config).await.unwrap();
891 let regtest = create_regtest_rpc(&mut config).await;
892 let rpc = regtest.rpc().clone();
893
894 let hashes = rpc.mine_blocks(4).await.unwrap();
895 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
896
897 super::set_initial_block_info_if_not_exists(&db, &rpc, config.protocol_paramset())
898 .await
899 .unwrap();
900
901 rpc.invalidate_block(hashes.get(3).unwrap()).await.unwrap();
902 rpc.invalidate_block(hashes.get(2).unwrap()).await.unwrap();
903
904 let mut dbtx = db.begin_transaction().await.unwrap();
905
906 let last_db_block =
907 super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, *hashes.get(3).unwrap())
908 .await
909 .unwrap();
910 assert_eq!(last_db_block.0.height, height);
911 assert_eq!(last_db_block.0.hash, *hashes.get(3).unwrap());
912
913 super::handle_reorg_events(&db, &mut dbtx, height - 2)
914 .await
915 .unwrap();
916
917 assert!(
918 super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, *hashes.get(3).unwrap())
919 .await
920 .is_err()
921 );
922
923 dbtx.commit().await.unwrap();
924 }
925
926 #[tokio::test]
927 async fn start_bitcoin_syncer_new_block_mined() {
928 let mut config = create_test_config_with_thread_name().await;
929 let db = Database::new(&config).await.unwrap();
930 let regtest = create_regtest_rpc(&mut config).await;
931 let rpc = regtest.rpc().clone();
932
933 rpc.mine_blocks(1).await.unwrap();
934 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
935 let hash = rpc.get_block_hash(height as u64).await.unwrap();
936
937 let (looping_task, _cancel_tx) =
938 BitcoinSyncer::new(db.clone(), rpc.clone(), config.protocol_paramset())
939 .await
940 .unwrap()
941 .into_task()
942 .cancelable_loop();
943
944 looping_task.into_bg();
945
946 loop {
947 let mut dbtx = db.begin_transaction().await.unwrap();
948
949 let last_db_block =
950 match super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash).await {
951 Ok(block) => block,
952 Err(_) => {
953 dbtx.commit().await.unwrap();
954 continue;
955 }
956 };
957
958 assert_eq!(last_db_block.0.height, height);
959 assert_eq!(last_db_block.0.hash, hash);
960
961 dbtx.commit().await.unwrap();
962 break;
963 }
964 }
965}