1use crate::{
8 config::protocol::ProtocolParamset,
9 database::{Database, DatabaseTransaction},
10 extended_bitcoin_rpc::ExtendedBitcoinRpc,
11 task::{IntoTask, RecoverableTask, Task, TaskExt, TaskVariant, WithDelay},
12};
13use bitcoin::{block::Header, BlockHash, OutPoint};
14use bitcoincore_rpc::RpcApi;
15use clementine_errors::BridgeError;
16use eyre::Context;
17use std::time::Duration;
18use tonic::async_trait;
19
20pub const BTC_SYNCER_POLL_DELAY: Duration = if cfg!(test) {
21 Duration::from_millis(250)
22} else {
23 Duration::from_secs(30)
24};
25
26#[derive(Clone, Debug)]
28struct BlockInfo {
29 hash: BlockHash,
30 _header: Header,
31 height: u32,
32}
33
34pub use clementine_primitives::BitcoinSyncerEvent;
35
36#[async_trait]
38pub trait BlockHandler: Send + Sync + 'static {
39 async fn handle_new_block(
41 &mut self,
42 dbtx: DatabaseTransaction<'_>,
43 block_id: u32,
44 block: bitcoin::Block,
45 height: u32,
46 ) -> Result<(), BridgeError>;
47}
48
49async fn fetch_block_info_from_height(
51 rpc: &ExtendedBitcoinRpc,
52 height: u32,
53) -> Result<BlockInfo, BridgeError> {
54 let hash = rpc
55 .get_block_hash(height as u64)
56 .await
57 .wrap_err("Failed to get block hash")?;
58 let header = rpc
59 .get_block_header(&hash)
60 .await
61 .wrap_err("Failed to get block header")?;
62
63 Ok(BlockInfo {
64 hash,
65 _header: header,
66 height,
67 })
68}
69
70pub(crate) async fn save_block(
72 db: &Database,
73 dbtx: DatabaseTransaction<'_>,
74 block: &bitcoin::Block,
75 block_height: u32,
76) -> Result<u32, BridgeError> {
77 let block_hash = block.block_hash();
78 tracing::debug!(
79 "Saving a block with hash of {} and height of {}",
80 block_hash,
81 block_height
82 );
83
84 let block_id = db.update_block_as_canonical(Some(dbtx), block_hash).await?;
86 db.upsert_full_block(Some(dbtx), block, block_height)
87 .await?;
88
89 if let Some(block_id) = block_id {
90 return Ok(block_id);
91 }
92
93 let block_id = db
94 .insert_block_info(
95 Some(dbtx),
96 &block_hash,
97 &block.header.prev_blockhash,
98 block_height,
99 )
100 .await?;
101
102 tracing::debug!(
103 "Saving {} transactions to a block with hash {}",
104 block.txdata.len(),
105 block_hash
106 );
107 for tx in &block.txdata {
108 save_transaction_spent_utxos(db, dbtx, tx, block_id).await?;
109 }
110
111 Ok(block_id)
112}
113async fn _get_block_info_from_hash(
114 db: &Database,
115 dbtx: DatabaseTransaction<'_>,
116 rpc: &ExtendedBitcoinRpc,
117 hash: BlockHash,
118) -> Result<(BlockInfo, Vec<Vec<OutPoint>>), BridgeError> {
119 let block = rpc.get_block(&hash).await.wrap_err("Failed to get block")?;
120 let block_height = db
121 .get_block_info_from_hash(Some(dbtx), hash)
122 .await?
123 .ok_or_else(|| eyre::eyre!("Block not found in get_block_info_from_hash"))?
124 .1;
125
126 let mut block_utxos: Vec<Vec<OutPoint>> = Vec::new();
127 for tx in &block.txdata {
128 let txid = tx.compute_txid();
129 let spent_utxos = _get_transaction_spent_utxos(db, dbtx, txid).await?;
130 block_utxos.push(spent_utxos);
131 }
132
133 let block_info = BlockInfo {
134 hash,
135 _header: block.header,
136 height: block_height,
137 };
138
139 Ok((block_info, block_utxos))
140}
141
142async fn save_transaction_spent_utxos(
144 db: &Database,
145 dbtx: DatabaseTransaction<'_>,
146 tx: &bitcoin::Transaction,
147 block_id: u32,
148) -> Result<(), BridgeError> {
149 let txid = tx.compute_txid();
150 db.insert_txid_to_block(dbtx, block_id, &txid).await?;
151
152 for input in &tx.input {
153 db.insert_spent_utxo(
154 dbtx,
155 block_id,
156 &txid,
157 &input.previous_output.txid,
158 input.previous_output.vout as i64,
159 )
160 .await?;
161 }
162
163 Ok(())
164}
165async fn _get_transaction_spent_utxos(
166 db: &Database,
167 dbtx: DatabaseTransaction<'_>,
168 txid: bitcoin::Txid,
169) -> Result<Vec<OutPoint>, BridgeError> {
170 let utxos = db.get_spent_utxos_for_txid(Some(dbtx), txid).await?;
171 let utxos = utxos.into_iter().map(|utxo| utxo.1).collect::<Vec<_>>();
172
173 Ok(utxos)
174}
175
176pub async fn set_initial_block_info_if_not_exists(
178 db: &Database,
179 rpc: &ExtendedBitcoinRpc,
180 paramset: &'static ProtocolParamset,
181) -> Result<(), BridgeError> {
182 if db.get_max_height(None).await?.is_some() {
183 return Ok(());
184 }
185
186 let current_height = u32::try_from(
187 rpc.get_block_count()
188 .await
189 .wrap_err("Failed to get block count")?,
190 )
191 .wrap_err(BridgeError::IntConversionError)?;
192
193 if paramset.start_height > current_height {
194 tracing::error!(
195 "Bitcoin syncer could not find enough available blocks in chain (Likely a regtest problem). start_height ({}) > current_height ({})",
196 paramset.start_height,
197 current_height
198 );
199 return Ok(());
200 }
201
202 let height = paramset.start_height;
203 let mut dbtx = db.begin_transaction().await?;
204 let block_info = fetch_block_info_from_height(rpc, height).await?;
206 let block = rpc
207 .get_block(&block_info.hash)
208 .await
209 .wrap_err("Failed to get block")?;
210 let block_id = save_block(db, &mut dbtx, &block, height).await?;
211 db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
212 .await?;
213
214 dbtx.commit().await?;
215
216 Ok(())
217}
218
219async fn fetch_new_blocks(
232 db: &Database,
233 rpc: &ExtendedBitcoinRpc,
234 current_height: u32,
235 finality_depth: u32,
236) -> Result<Option<Vec<BlockInfo>>, BridgeError> {
237 let next_height = current_height + 1;
238
239 let block_hash = match rpc.get_block_hash(next_height as u64).await {
241 Ok(hash) => hash,
242 Err(_) => return Ok(None),
243 };
244 tracing::debug!(
245 "Fetching block with hash of {:?} and height of {}...",
246 block_hash,
247 next_height
248 );
249
250 let mut block_header = rpc
252 .get_block_header(&block_hash)
253 .await
254 .wrap_err("Failed to get block header")?;
255 let mut new_blocks = vec![BlockInfo {
256 hash: block_hash,
257 _header: block_header,
258 height: next_height,
259 }];
260
261 while db
263 .get_block_info_from_hash(None, block_header.prev_blockhash)
264 .await?
265 .is_none()
266 {
267 let prev_block_hash = block_header.prev_blockhash;
268 block_header = rpc
269 .get_block_header(&prev_block_hash)
270 .await
271 .wrap_err("Failed to get block header")?;
272 let new_height = new_blocks.last().expect("new_blocks is empty").height - 1;
273 new_blocks.push(BlockInfo {
274 hash: prev_block_hash,
275 _header: block_header,
276 height: new_height,
277 });
278
279 if new_blocks.len() as u32 > finality_depth {
282 return Err(eyre::eyre!(
283 "Number of reorged blocks {} is greater than finality depth {}, reorged blocks: {:?}. If true, increase finality depth and resync the chain",
284 new_blocks.len() - 1,
285 finality_depth,
286 new_blocks
287 )
288 .into());
289 }
290 }
291
292 new_blocks.reverse();
294
295 Ok(Some(new_blocks))
296}
297
298#[tracing::instrument(skip(db, dbtx), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
300async fn handle_reorg_events(
301 db: &Database,
302 dbtx: DatabaseTransaction<'_>,
303 common_ancestor_height: u32,
304) -> Result<(), BridgeError> {
305 let reorg_blocks = db
306 .update_non_canonical_block_hashes(Some(dbtx), common_ancestor_height)
307 .await?;
308 if !reorg_blocks.is_empty() {
309 tracing::debug!("Reorg occurred! Block ids: {:?}", reorg_blocks);
310 }
311
312 for reorg_block_id in reorg_blocks {
313 db.insert_event(Some(dbtx), BitcoinSyncerEvent::ReorgedBlock(reorg_block_id))
314 .await?;
315 }
316
317 Ok(())
318}
319
320async fn process_new_blocks(
322 db: &Database,
323 rpc: &ExtendedBitcoinRpc,
324 dbtx: DatabaseTransaction<'_>,
325 new_blocks: &[BlockInfo],
326) -> Result<(), BridgeError> {
327 for block_info in new_blocks {
328 let block = rpc
329 .get_block(&block_info.hash)
330 .await
331 .wrap_err("Failed to get block")?;
332
333 let block_id = save_block(db, dbtx, &block, block_info.height).await?;
334 db.insert_event(Some(dbtx), BitcoinSyncerEvent::NewBlock(block_id))
335 .await?;
336 }
337
338 Ok(())
339}
340
341#[derive(Debug)]
343pub struct BitcoinSyncerTask {
344 db: Database,
346 rpc: ExtendedBitcoinRpc,
348 current_height: u32,
350 finality_depth: u32,
352}
353
354#[derive(Debug)]
355pub struct BitcoinSyncer {
356 db: Database,
358 rpc: ExtendedBitcoinRpc,
360 current_height: u32,
362 finality_depth: u32,
364}
365
366impl BitcoinSyncer {
367 pub async fn new(
371 db: Database,
372 rpc: ExtendedBitcoinRpc,
373 paramset: &'static ProtocolParamset,
374 ) -> Result<Self, BridgeError> {
375 set_initial_block_info_if_not_exists(&db, &rpc, paramset).await?;
377
378 let current_height = db
380 .get_max_height(None)
381 .await?
382 .ok_or_else(|| eyre::eyre!("Block not found in BitcoinSyncer::new"))?;
383
384 Ok(Self {
385 db,
386 rpc,
387 current_height,
388 finality_depth: paramset.finality_depth,
389 })
390 }
391}
392impl IntoTask for BitcoinSyncer {
393 type Task = WithDelay<BitcoinSyncerTask>;
394
395 fn into_task(self) -> Self::Task {
396 BitcoinSyncerTask {
397 db: self.db,
398 rpc: self.rpc,
399 current_height: self.current_height,
400 finality_depth: self.finality_depth,
401 }
402 .with_delay(BTC_SYNCER_POLL_DELAY)
403 }
404}
405
406#[async_trait]
407impl Task for BitcoinSyncerTask {
408 type Output = bool;
409 const VARIANT: TaskVariant = TaskVariant::BitcoinSyncer;
410
411 #[tracing::instrument(skip(self))]
412 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
413 let new_blocks = match fetch_new_blocks(
414 &self.db,
415 &self.rpc,
416 self.current_height,
417 self.finality_depth,
418 )
419 .await?
420 {
421 Some(blocks) if !blocks.is_empty() => {
422 tracing::debug!(
423 "{} new blocks found after current height {}",
424 blocks.len(),
425 self.current_height
426 );
427
428 blocks
429 }
430 _ => {
431 tracing::debug!(
432 "No new blocks found after current height: {}",
433 self.current_height
434 );
435
436 return Ok(false);
437 }
438 };
439
440 let common_ancestor_height = new_blocks[0].height - 1;
445 tracing::debug!("Common ancestor height: {:?}", common_ancestor_height);
446 let mut dbtx = self.db.begin_transaction().await?;
447
448 handle_reorg_events(&self.db, &mut dbtx, common_ancestor_height).await?;
450 tracing::debug!("BitcoinSyncer: Marked reorg blocks as non-canonical");
451
452 tracing::debug!("BitcoinSyncer: Processing new blocks");
454 tracing::debug!("BitcoinSyncer: New blocks: {:?}", new_blocks.len());
455 process_new_blocks(&self.db, &self.rpc, &mut dbtx, &new_blocks).await?;
456
457 dbtx.commit().await?;
458
459 tracing::debug!("BitcoinSyncer: Updating current height");
461 self.current_height = new_blocks.last().expect("new_blocks is not empty").height;
462 tracing::debug!("BitcoinSyncer: Current height: {:?}", self.current_height);
463
464 Ok(true)
466 }
467}
468
469#[derive(Debug)]
470pub struct FinalizedBlockFetcherTask<H: BlockHandler> {
471 db: Database,
472 btc_syncer_consumer_id: String,
473 paramset: &'static ProtocolParamset,
474 next_finalized_height: u32,
475 handler: H,
476}
477
478impl<H: BlockHandler> FinalizedBlockFetcherTask<H> {
479 pub fn new(
480 db: Database,
481 btc_syncer_consumer_id: String,
482 paramset: &'static ProtocolParamset,
483 next_finalized_height: u32,
484 handler: H,
485 ) -> Self {
486 tracing::debug!(
487 "Creating finalized block fetcher with consumer id: {}",
488 btc_syncer_consumer_id
489 );
490 Self {
491 db,
492 btc_syncer_consumer_id,
493 paramset,
494 next_finalized_height,
495 handler,
496 }
497 }
498}
499
500#[async_trait]
501impl<H: BlockHandler> Task for FinalizedBlockFetcherTask<H> {
502 type Output = bool;
503 const VARIANT: TaskVariant = TaskVariant::FinalizedBlockFetcher;
504
505 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
506 let mut dbtx = self.db.begin_transaction().await?;
507
508 let Some(event) = self
510 .db
511 .fetch_next_bitcoin_syncer_evt(&mut dbtx, &self.btc_syncer_consumer_id)
512 .await?
513 else {
514 dbtx.commit().await?;
516 return Ok(false);
517 };
518 let mut expected_next_finalized = self.next_finalized_height;
519
520 match event {
522 BitcoinSyncerEvent::NewBlock(block_id) => {
523 let new_block_height = self
524 .db
525 .get_block_info_from_id(Some(&mut dbtx), block_id)
526 .await?
527 .ok_or(eyre::eyre!("Block not found in BlockFetcherTask",))?
528 .1;
529
530 let mut new_tip = false;
532 let mut warned = false;
533
534 while self
536 .paramset
537 .is_block_finalized(expected_next_finalized, new_block_height)
538 {
539 if new_tip && !warned {
540 warned = true;
541 tracing::warn!("Received event with multiple finalized blocks, expected 1 for ordered events. Got a new block with height {new_block_height}, expected next finalized block {}", self.next_finalized_height);
543 }
544 new_tip = true;
545
546 let block = self
547 .db
548 .get_full_block(Some(&mut dbtx), expected_next_finalized)
549 .await?
550 .ok_or(eyre::eyre!(
551 "Block at height {} not found in BlockFetcherTask, current tip height is {}",
552 expected_next_finalized, new_block_height
553 ))?;
554
555 let new_block_id = self
556 .db
557 .get_canonical_block_id_from_height(
558 Some(&mut dbtx),
559 expected_next_finalized,
560 )
561 .await?;
562
563 let Some(new_block_id) = new_block_id else {
564 tracing::error!("Block at height {} not found in BlockFetcherTask, current tip height is {}", expected_next_finalized, new_block_height);
565 return Err(eyre::eyre!(
566 "Block at height {} not found in BlockFetcherTask, current tip height is {}",
567 expected_next_finalized, new_block_height
568 ).into());
569 };
570
571 self.handler
572 .handle_new_block(&mut dbtx, new_block_id, block, expected_next_finalized)
573 .await?;
574
575 expected_next_finalized += 1;
576 }
577 }
578 BitcoinSyncerEvent::ReorgedBlock(_) => {}
579 };
580
581 dbtx.commit().await?;
582 self.next_finalized_height = expected_next_finalized;
584 Ok(true)
586 }
587}
588
589#[async_trait]
590impl<H: BlockHandler> RecoverableTask for FinalizedBlockFetcherTask<H> {
591 async fn recover_from_error(&mut self, _error: &BridgeError) -> Result<(), BridgeError> {
592 Ok(())
595 }
596}
597
598#[cfg(test)]
599mod tests {
600 use crate::bitcoin_syncer::BitcoinSyncer;
601 use crate::builder::transaction::DEFAULT_SEQUENCE;
602 use crate::task::{IntoTask, TaskExt};
603 use crate::{database::Database, test::common::*};
604 use bitcoin::absolute::Height;
605 use bitcoin::hashes::Hash;
606 use bitcoin::transaction::Version;
607 use bitcoin::{OutPoint, ScriptBuf, Transaction, TxIn, Witness};
608 use bitcoincore_rpc::RpcApi;
609
610 #[tokio::test]
611 async fn get_block_info_from_height() {
612 let mut config = create_test_config_with_thread_name().await;
613 let regtest = create_regtest_rpc(&mut config).await;
614 let rpc = regtest.rpc().clone();
615
616 rpc.mine_blocks(1).await.unwrap();
617 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
618 let hash = rpc.get_block_hash(height as u64).await.unwrap();
619 let header = rpc.get_block_header(&hash).await.unwrap();
620
621 let block_info = super::fetch_block_info_from_height(&rpc, height)
622 .await
623 .unwrap();
624 assert_eq!(block_info._header, header);
625 assert_eq!(block_info.hash, hash);
626 assert_eq!(block_info.height, height);
627
628 rpc.mine_blocks(1).await.unwrap();
629 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
630
631 let block_info = super::fetch_block_info_from_height(&rpc, height)
632 .await
633 .unwrap();
634 assert_ne!(block_info._header, header);
635 assert_ne!(block_info.hash, hash);
636 assert_eq!(block_info.height, height);
637 }
638
639 #[tokio::test]
640 async fn save_get_transaction_spent_utxos() {
641 let mut config = create_test_config_with_thread_name().await;
642 let db = Database::new(&config).await.unwrap();
643 let regtest = create_regtest_rpc(&mut config).await;
644 let rpc = regtest.rpc().clone();
645
646 let mut dbtx = db.begin_transaction().await.unwrap();
647
648 rpc.mine_blocks(1).await.unwrap();
649 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
650 let hash = rpc.get_block_hash(height as u64).await.unwrap();
651 let block = rpc.get_block(&hash).await.unwrap();
652 let block_id = super::save_block(&db, &mut dbtx, &block, height)
653 .await
654 .unwrap();
655
656 let inputs = vec![
657 TxIn {
658 previous_output: OutPoint {
659 txid: bitcoin::Txid::all_zeros(),
660 vout: 0,
661 },
662 script_sig: ScriptBuf::default(),
663 sequence: DEFAULT_SEQUENCE,
664 witness: Witness::default(),
665 },
666 TxIn {
667 previous_output: OutPoint {
668 txid: bitcoin::Txid::all_zeros(),
669 vout: 1,
670 },
671 script_sig: ScriptBuf::default(),
672 sequence: DEFAULT_SEQUENCE,
673 witness: Witness::default(),
674 },
675 ];
676 let tx = Transaction {
677 version: Version::TWO,
678 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
679 input: inputs.clone(),
680 output: vec![],
681 };
682 super::save_transaction_spent_utxos(&db, &mut dbtx, &tx, block_id)
683 .await
684 .unwrap();
685
686 let utxos = super::_get_transaction_spent_utxos(&db, &mut dbtx, tx.compute_txid())
687 .await
688 .unwrap();
689
690 for (index, input) in inputs.iter().enumerate() {
691 assert_eq!(input.previous_output, utxos[index]);
692 }
693
694 dbtx.commit().await.unwrap();
695 }
696
697 #[tokio::test]
698 async fn save_get_block() {
699 let mut config = create_test_config_with_thread_name().await;
700 let db = Database::new(&config).await.unwrap();
701 let regtest = create_regtest_rpc(&mut config).await;
702 let rpc = regtest.rpc().clone();
703
704 let mut dbtx = db.begin_transaction().await.unwrap();
705
706 rpc.mine_blocks(1).await.unwrap();
707 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
708 let hash = rpc.get_block_hash(height as u64).await.unwrap();
709 let block = rpc.get_block(&hash).await.unwrap();
710
711 super::save_block(&db, &mut dbtx, &block, height)
712 .await
713 .unwrap();
714
715 let (block_info, utxos) = super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
716 .await
717 .unwrap();
718 assert_eq!(block_info._header, block.header);
719 assert_eq!(block_info.hash, hash);
720 assert_eq!(block_info.height, height);
721 for (tx_index, tx) in block.txdata.iter().enumerate() {
722 for (txin_index, txin) in tx.input.iter().enumerate() {
723 assert_eq!(txin.previous_output, utxos[tx_index][txin_index]);
724 }
725 }
726
727 dbtx.commit().await.unwrap();
728 }
729
730 #[tokio::test]
731 async fn set_initial_block_info_if_not_exists() {
732 let mut config = create_test_config_with_thread_name().await;
733 let db = Database::new(&config).await.unwrap();
734 let regtest = create_regtest_rpc(&mut config).await;
735 let rpc = regtest.rpc().clone();
736
737 let mut dbtx = db.begin_transaction().await.unwrap();
738
739 rpc.mine_blocks(1).await.unwrap();
740 let hash = rpc
742 .get_block_hash(config.protocol_paramset().start_height as u64)
743 .await
744 .unwrap();
745 let block = rpc.get_block(&hash).await.unwrap();
746
747 assert!(super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
748 .await
749 .is_err());
750
751 super::set_initial_block_info_if_not_exists(&db, &rpc, config.protocol_paramset())
752 .await
753 .unwrap();
754
755 let (block_info, utxos) = super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
756 .await
757 .unwrap();
758 assert_eq!(block_info.hash, hash);
759 assert_eq!(block_info.height, config.protocol_paramset().start_height);
760
761 for (tx_index, tx) in block.txdata.iter().enumerate() {
762 for (txin_index, txin) in tx.input.iter().enumerate() {
763 assert_eq!(txin.previous_output, utxos[tx_index][txin_index]);
764 }
765 }
766 }
767
768 #[tokio::test]
769 async fn fetch_new_blocks_forward() {
770 let mut config = create_test_config_with_thread_name().await;
771 let db = Database::new(&config).await.unwrap();
772 let regtest = create_regtest_rpc(&mut config).await;
773 let rpc = regtest.rpc().clone();
774
775 let mut dbtx = db.begin_transaction().await.unwrap();
776
777 rpc.mine_blocks(1).await.unwrap();
778 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
779 let hash = rpc.get_block_hash(height as u64).await.unwrap();
780 let block = rpc.get_block(&hash).await.unwrap();
781 super::save_block(&db, &mut dbtx, &block, height)
782 .await
783 .unwrap();
784 dbtx.commit().await.unwrap();
785
786 let new_blocks =
787 super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
788 .await
789 .unwrap();
790 assert!(new_blocks.is_none());
791
792 let new_block_hashes = rpc.mine_blocks(1).await.unwrap();
793 let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
794 let new_blocks =
795 super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
796 .await
797 .unwrap()
798 .unwrap();
799 assert_eq!(new_blocks.len(), 1);
800 assert_eq!(new_blocks.first().unwrap().height, new_height);
801 assert_eq!(
802 new_blocks.first().unwrap().hash,
803 *new_block_hashes.first().unwrap()
804 );
805 }
806
807 #[tokio::test]
808 async fn fetch_new_blocks_backwards() {
809 let mut config = create_test_config_with_thread_name().await;
810 let db = Database::new(&config).await.unwrap();
811 let regtest = create_regtest_rpc(&mut config).await;
812 let rpc = regtest.rpc().clone();
813
814 rpc.mine_blocks(1).await.unwrap();
816 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
817 let hash = rpc.get_block_hash(height as u64).await.unwrap();
818 let block = rpc.get_block(&hash).await.unwrap();
819
820 let mut dbtx = db.begin_transaction().await.unwrap();
822 super::save_block(&db, &mut dbtx, &block, height)
823 .await
824 .unwrap();
825 dbtx.commit().await.unwrap();
826
827 let new_blocks =
828 super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
829 .await
830 .unwrap();
831 assert!(new_blocks.is_none());
832
833 let mine_count: u32 = config.protocol_paramset().finality_depth - 1;
835 let new_block_hashes = rpc.mine_blocks(mine_count as u64).await.unwrap();
836 let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
837
838 let new_blocks = super::fetch_new_blocks(
839 &db,
840 &rpc,
841 new_height - 1,
842 config.protocol_paramset().finality_depth,
843 )
844 .await
845 .unwrap()
846 .unwrap();
847 assert_eq!(new_blocks.len(), mine_count as usize);
848 for (index, block) in new_blocks.iter().enumerate() {
849 assert_eq!(block.height, new_height - mine_count + index as u32 + 1);
850 assert_eq!(block.hash, new_block_hashes[index]);
851 }
852
853 let mine_count: u32 = config.protocol_paramset().finality_depth;
855 rpc.mine_blocks(mine_count as u64).await.unwrap();
856 let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
857
858 assert!(super::fetch_new_blocks(
859 &db,
860 &rpc,
861 new_height - 1,
862 config.protocol_paramset().finality_depth
863 )
864 .await
865 .is_err());
866 }
867 #[ignore]
868 #[tokio::test]
869 async fn set_non_canonical_block_hashes() {
870 let mut config = create_test_config_with_thread_name().await;
871 let db = Database::new(&config).await.unwrap();
872 let regtest = create_regtest_rpc(&mut config).await;
873 let rpc = regtest.rpc().clone();
874
875 let hashes = rpc.mine_blocks(4).await.unwrap();
876 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
877
878 super::set_initial_block_info_if_not_exists(&db, &rpc, config.protocol_paramset())
879 .await
880 .unwrap();
881
882 rpc.invalidate_block(hashes.get(3).unwrap()).await.unwrap();
883 rpc.invalidate_block(hashes.get(2).unwrap()).await.unwrap();
884
885 let mut dbtx = db.begin_transaction().await.unwrap();
886
887 let last_db_block =
888 super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, *hashes.get(3).unwrap())
889 .await
890 .unwrap();
891 assert_eq!(last_db_block.0.height, height);
892 assert_eq!(last_db_block.0.hash, *hashes.get(3).unwrap());
893
894 super::handle_reorg_events(&db, &mut dbtx, height - 2)
895 .await
896 .unwrap();
897
898 assert!(
899 super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, *hashes.get(3).unwrap())
900 .await
901 .is_err()
902 );
903
904 dbtx.commit().await.unwrap();
905 }
906
907 #[tokio::test]
908 async fn start_bitcoin_syncer_new_block_mined() {
909 let mut config = create_test_config_with_thread_name().await;
910 let db = Database::new(&config).await.unwrap();
911 let regtest = create_regtest_rpc(&mut config).await;
912 let rpc = regtest.rpc().clone();
913
914 rpc.mine_blocks(1).await.unwrap();
915 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
916 let hash = rpc.get_block_hash(height as u64).await.unwrap();
917
918 let (looping_task, _cancel_tx) =
919 BitcoinSyncer::new(db.clone(), rpc.clone(), config.protocol_paramset())
920 .await
921 .unwrap()
922 .into_task()
923 .cancelable_loop();
924
925 looping_task.into_bg();
926
927 loop {
928 let mut dbtx = db.begin_transaction().await.unwrap();
929
930 let last_db_block =
931 match super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash).await {
932 Ok(block) => block,
933 Err(_) => {
934 dbtx.commit().await.unwrap();
935 continue;
936 }
937 };
938
939 assert_eq!(last_db_block.0.height, height);
940 assert_eq!(last_db_block.0.hash, hash);
941
942 dbtx.commit().await.unwrap();
943 break;
944 }
945 }
946}