1use crate::{
8 config::protocol::ProtocolParamset,
9 database::{Database, DatabaseTransaction},
10 extended_bitcoin_rpc::ExtendedBitcoinRpc,
11 task::{IntoTask, RecoverableTask, Task, TaskExt, TaskVariant, WithDelay},
12};
13use bitcoin::{block::Header, BlockHash, OutPoint};
14use bitcoincore_rpc::RpcApi;
15use clementine_errors::BridgeError;
16use eyre::Context;
17use std::time::Duration;
18use tonic::async_trait;
19
20pub const BTC_SYNCER_POLL_DELAY: Duration = if cfg!(test) {
21 Duration::from_millis(250)
22} else {
23 Duration::from_secs(30)
24};
25
26#[derive(Clone, Debug)]
28struct BlockInfo {
29 hash: BlockHash,
30 _header: Header,
31 height: u32,
32}
33
34pub use clementine_primitives::BitcoinSyncerEvent;
35
36#[async_trait]
38pub trait BlockHandler: Send + Sync + 'static {
39 async fn handle_new_block(
41 &mut self,
42 dbtx: DatabaseTransaction<'_>,
43 block_id: u32,
44 block: bitcoin::Block,
45 height: u32,
46 ) -> Result<(), BridgeError>;
47}
48
49async fn fetch_block_info_from_height(
51 rpc: &ExtendedBitcoinRpc,
52 height: u32,
53) -> Result<BlockInfo, BridgeError> {
54 let hash = rpc
55 .get_block_hash(height as u64)
56 .await
57 .wrap_err("Failed to get block hash")?;
58 let header = rpc
59 .get_block_header(&hash)
60 .await
61 .wrap_err("Failed to get block header")?;
62
63 Ok(BlockInfo {
64 hash,
65 _header: header,
66 height,
67 })
68}
69
70pub(crate) async fn save_block(
72 db: &Database,
73 dbtx: DatabaseTransaction<'_>,
74 block: &bitcoin::Block,
75 block_height: u32,
76) -> Result<u32, BridgeError> {
77 let block_hash = block.block_hash();
78 tracing::debug!(
79 "Saving a block with hash of {} and height of {}",
80 block_hash,
81 block_height
82 );
83
84 let block_id = db.update_block_as_canonical(Some(dbtx), block_hash).await?;
86 db.upsert_full_block(Some(dbtx), block, block_height)
87 .await?;
88
89 if let Some(block_id) = block_id {
90 return Ok(block_id);
91 }
92
93 let block_id = db
94 .insert_block_info(
95 Some(dbtx),
96 &block_hash,
97 &block.header.prev_blockhash,
98 block_height,
99 )
100 .await?;
101
102 tracing::debug!(
103 "Saving {} transactions to a block with hash {}",
104 block.txdata.len(),
105 block_hash
106 );
107 for tx in &block.txdata {
108 save_transaction_spent_utxos(db, dbtx, tx, block_id).await?;
109 }
110
111 Ok(block_id)
112}
113async fn _get_block_info_from_hash(
114 db: &Database,
115 dbtx: DatabaseTransaction<'_>,
116 rpc: &ExtendedBitcoinRpc,
117 hash: BlockHash,
118) -> Result<(BlockInfo, Vec<Vec<OutPoint>>), BridgeError> {
119 let block = rpc.get_block(&hash).await.wrap_err("Failed to get block")?;
120 let block_height = db
121 .get_block_info_from_hash(Some(dbtx), hash)
122 .await?
123 .ok_or_else(|| eyre::eyre!("Block not found in get_block_info_from_hash"))?
124 .1;
125
126 let mut block_utxos: Vec<Vec<OutPoint>> = Vec::new();
127 for tx in &block.txdata {
128 let txid = tx.compute_txid();
129 let spent_utxos = _get_transaction_spent_utxos(db, dbtx, txid).await?;
130 block_utxos.push(spent_utxos);
131 }
132
133 let block_info = BlockInfo {
134 hash,
135 _header: block.header,
136 height: block_height,
137 };
138
139 Ok((block_info, block_utxos))
140}
141
142async fn save_transaction_spent_utxos(
144 db: &Database,
145 dbtx: DatabaseTransaction<'_>,
146 tx: &bitcoin::Transaction,
147 block_id: u32,
148) -> Result<(), BridgeError> {
149 let txid = tx.compute_txid();
150 db.insert_txid_to_block(dbtx, block_id, &txid).await?;
151
152 for input in &tx.input {
153 db.insert_spent_utxo(
154 dbtx,
155 block_id,
156 &txid,
157 &input.previous_output.txid,
158 input.previous_output.vout as i64,
159 )
160 .await?;
161 }
162
163 Ok(())
164}
165async fn _get_transaction_spent_utxos(
166 db: &Database,
167 dbtx: DatabaseTransaction<'_>,
168 txid: bitcoin::Txid,
169) -> Result<Vec<OutPoint>, BridgeError> {
170 let utxos = db.get_spent_utxos_for_txid(Some(dbtx), txid).await?;
171 let utxos = utxos.into_iter().map(|utxo| utxo.1).collect::<Vec<_>>();
172
173 Ok(utxos)
174}
175
176pub async fn set_initial_block_info_if_not_exists(
178 db: &Database,
179 rpc: &ExtendedBitcoinRpc,
180 paramset: &'static ProtocolParamset,
181) -> Result<(), BridgeError> {
182 if db.get_max_height(None).await?.is_some() {
183 return Ok(());
184 }
185
186 let current_height = u32::try_from(
187 rpc.get_block_count()
188 .await
189 .wrap_err("Failed to get block count")?,
190 )
191 .wrap_err(BridgeError::IntConversionError)?;
192
193 if paramset.start_height > current_height {
194 tracing::error!(
195 "Bitcoin syncer could not find enough available blocks in chain (Likely a regtest problem). start_height ({}) > current_height ({})",
196 paramset.start_height,
197 current_height
198 );
199 return Ok(());
200 }
201
202 let height = paramset.start_height;
203 let mut dbtx = db.begin_transaction().await?;
204 let block_info = fetch_block_info_from_height(rpc, height).await?;
206 let block = rpc
207 .get_block(&block_info.hash)
208 .await
209 .wrap_err("Failed to get block")?;
210 let block_id = save_block(db, &mut dbtx, &block, height).await?;
211 db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
212 .await?;
213
214 dbtx.commit().await?;
215
216 Ok(())
217}
218
219async fn fetch_new_blocks(
232 db: &Database,
233 rpc: &ExtendedBitcoinRpc,
234 current_height: u32,
235 finality_depth: u32,
236) -> Result<Option<Vec<BlockInfo>>, BridgeError> {
237 let next_height = current_height + 1;
238
239 let block_hash = match rpc.get_block_hash(next_height as u64).await {
241 Ok(hash) => hash,
242 Err(_) => return Ok(None),
243 };
244 tracing::debug!(
245 "Fetching block with hash of {:?} and height of {}...",
246 block_hash,
247 next_height
248 );
249
250 let mut block_header = rpc
252 .get_block_header(&block_hash)
253 .await
254 .wrap_err("Failed to get block header")?;
255 let mut new_blocks = vec![BlockInfo {
256 hash: block_hash,
257 _header: block_header,
258 height: next_height,
259 }];
260
261 while db
263 .get_block_info_from_hash(None, block_header.prev_blockhash)
264 .await?
265 .is_none()
266 {
267 let prev_block_hash = block_header.prev_blockhash;
268 block_header = rpc
269 .get_block_header(&prev_block_hash)
270 .await
271 .wrap_err("Failed to get block header")?;
272 let new_height = new_blocks.last().expect("new_blocks is empty").height - 1;
273 new_blocks.push(BlockInfo {
274 hash: prev_block_hash,
275 _header: block_header,
276 height: new_height,
277 });
278
279 if new_blocks.len() as u32 > finality_depth {
282 return Err(eyre::eyre!(
283 "Number of reorged blocks {} is greater than finality depth {}, reorged blocks: {:?}. If true, increase finality depth and resync the chain",
284 new_blocks.len() - 1,
285 finality_depth,
286 new_blocks
287 )
288 .into());
289 }
290 }
291
292 new_blocks.reverse();
294
295 Ok(Some(new_blocks))
296}
297
298#[tracing::instrument(skip(db, dbtx), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
300async fn handle_reorg_events(
301 db: &Database,
302 dbtx: DatabaseTransaction<'_>,
303 common_ancestor_height: u32,
304) -> Result<(), BridgeError> {
305 let reorg_blocks = db
306 .update_non_canonical_block_hashes(Some(dbtx), common_ancestor_height)
307 .await?;
308 if !reorg_blocks.is_empty() {
309 tracing::debug!("Reorg occurred! Block ids: {:?}", reorg_blocks);
310 }
311
312 for reorg_block_id in reorg_blocks {
313 db.insert_event(Some(dbtx), BitcoinSyncerEvent::ReorgedBlock(reorg_block_id))
314 .await?;
315 }
316
317 Ok(())
318}
319
320async fn process_new_blocks(
322 db: &Database,
323 rpc: &ExtendedBitcoinRpc,
324 dbtx: DatabaseTransaction<'_>,
325 new_blocks: &[BlockInfo],
326) -> Result<(), BridgeError> {
327 for block_info in new_blocks {
328 let block = rpc
329 .get_block(&block_info.hash)
330 .await
331 .wrap_err("Failed to get block")?;
332
333 let block_id = save_block(db, dbtx, &block, block_info.height).await?;
334 db.insert_event(Some(dbtx), BitcoinSyncerEvent::NewBlock(block_id))
335 .await?;
336 }
337
338 Ok(())
339}
340
341#[derive(Debug)]
343pub struct BitcoinSyncerTask {
344 db: Database,
346 rpc: ExtendedBitcoinRpc,
348 current_height: u32,
350 finality_depth: u32,
352}
353
354#[derive(Debug)]
355pub struct BitcoinSyncer {
356 db: Database,
358 rpc: ExtendedBitcoinRpc,
360 current_height: u32,
362 finality_depth: u32,
364}
365
366impl BitcoinSyncer {
367 pub async fn new(
371 db: Database,
372 rpc: ExtendedBitcoinRpc,
373 paramset: &'static ProtocolParamset,
374 ) -> Result<Self, BridgeError> {
375 set_initial_block_info_if_not_exists(&db, &rpc, paramset).await?;
377
378 let current_height = db
380 .get_max_height(None)
381 .await?
382 .ok_or_else(|| eyre::eyre!("Block not found in BitcoinSyncer::new"))?;
383
384 Ok(Self {
385 db,
386 rpc,
387 current_height,
388 finality_depth: paramset.finality_depth,
389 })
390 }
391}
392impl IntoTask for BitcoinSyncer {
393 type Task = WithDelay<BitcoinSyncerTask>;
394
395 fn into_task(self) -> Self::Task {
396 BitcoinSyncerTask {
397 db: self.db,
398 rpc: self.rpc,
399 current_height: self.current_height,
400 finality_depth: self.finality_depth,
401 }
402 .with_delay(BTC_SYNCER_POLL_DELAY)
403 }
404}
405
406#[async_trait]
407impl Task for BitcoinSyncerTask {
408 type Output = bool;
409 const VARIANT: TaskVariant = TaskVariant::BitcoinSyncer;
410
411 #[tracing::instrument(skip(self))]
412 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
413 let new_blocks = match fetch_new_blocks(
414 &self.db,
415 &self.rpc,
416 self.current_height,
417 self.finality_depth,
418 )
419 .await?
420 {
421 Some(blocks) if !blocks.is_empty() => {
422 tracing::debug!(
423 "{} new blocks found after current height {}",
424 blocks.len(),
425 self.current_height
426 );
427
428 blocks
429 }
430 _ => {
431 tracing::debug!(
432 "No new blocks found after current height: {}",
433 self.current_height
434 );
435
436 return Ok(false);
437 }
438 };
439
440 let common_ancestor_height = new_blocks[0].height - 1;
445 tracing::debug!("Common ancestor height: {:?}", common_ancestor_height);
446 let mut dbtx = self.db.begin_transaction().await?;
447
448 handle_reorg_events(&self.db, &mut dbtx, common_ancestor_height).await?;
450 tracing::debug!("BitcoinSyncer: Marked reorg blocks as non-canonical");
451
452 tracing::debug!("BitcoinSyncer: Processing new blocks");
454 tracing::debug!("BitcoinSyncer: New blocks: {:?}", new_blocks.len());
455 process_new_blocks(&self.db, &self.rpc, &mut dbtx, &new_blocks).await?;
456
457 dbtx.commit().await?;
458
459 tracing::debug!("BitcoinSyncer: Updating current height");
461 self.current_height = new_blocks.last().expect("new_blocks is not empty").height;
462 tracing::debug!("BitcoinSyncer: Current height: {:?}", self.current_height);
463
464 Ok(true)
466 }
467}
468
469#[derive(Debug)]
470pub struct FinalizedBlockFetcherTask<H: BlockHandler> {
471 db: Database,
472 btc_syncer_consumer_id: String,
473 paramset: &'static ProtocolParamset,
474 next_finalized_height: u32,
475 handler: H,
476}
477
478impl<H: BlockHandler> FinalizedBlockFetcherTask<H> {
479 pub fn new(
480 db: Database,
481 btc_syncer_consumer_id: String,
482 paramset: &'static ProtocolParamset,
483 next_finalized_height: u32,
484 handler: H,
485 ) -> Self {
486 Self {
487 db,
488 btc_syncer_consumer_id,
489 paramset,
490 next_finalized_height,
491 handler,
492 }
493 }
494}
495
496#[async_trait]
497impl<H: BlockHandler> Task for FinalizedBlockFetcherTask<H> {
498 type Output = bool;
499 const VARIANT: TaskVariant = TaskVariant::FinalizedBlockFetcher;
500
501 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
502 let mut dbtx = self.db.begin_transaction().await?;
503
504 let Some(event) = self
506 .db
507 .fetch_next_bitcoin_syncer_evt(&mut dbtx, &self.btc_syncer_consumer_id)
508 .await?
509 else {
510 dbtx.commit().await?;
512 return Ok(false);
513 };
514 let mut expected_next_finalized = self.next_finalized_height;
515
516 let did_find_new_block = match event {
518 BitcoinSyncerEvent::NewBlock(block_id) => {
519 let new_block_height = self
520 .db
521 .get_block_info_from_id(Some(&mut dbtx), block_id)
522 .await?
523 .ok_or(eyre::eyre!("Block not found in BlockFetcherTask",))?
524 .1;
525
526 let mut new_tip = false;
528 let mut warned = false;
529
530 while self
532 .paramset
533 .is_block_finalized(expected_next_finalized, new_block_height)
534 {
535 if new_tip && !warned {
536 warned = true;
537 tracing::warn!("Received event with multiple finalized blocks, expected 1 for ordered events. Got a new block with height {new_block_height}, expected next finalized block {}", self.next_finalized_height);
539 }
540 new_tip = true;
541
542 let block = self
543 .db
544 .get_full_block(Some(&mut dbtx), expected_next_finalized)
545 .await?
546 .ok_or(eyre::eyre!(
547 "Block at height {} not found in BlockFetcherTask, current tip height is {}",
548 expected_next_finalized, new_block_height
549 ))?;
550
551 let new_block_id = self
552 .db
553 .get_canonical_block_id_from_height(
554 Some(&mut dbtx),
555 expected_next_finalized,
556 )
557 .await?;
558
559 let Some(new_block_id) = new_block_id else {
560 tracing::error!("Block at height {} not found in BlockFetcherTask, current tip height is {}", expected_next_finalized, new_block_height);
561 return Err(eyre::eyre!(
562 "Block at height {} not found in BlockFetcherTask, current tip height is {}",
563 expected_next_finalized, new_block_height
564 ).into());
565 };
566
567 self.handler
568 .handle_new_block(&mut dbtx, new_block_id, block, expected_next_finalized)
569 .await?;
570
571 expected_next_finalized += 1;
572 }
573
574 new_tip
575 }
576 BitcoinSyncerEvent::ReorgedBlock(_) => false,
577 };
578
579 dbtx.commit().await?;
580 self.next_finalized_height = expected_next_finalized;
582 Ok(did_find_new_block)
584 }
585}
586
587#[async_trait]
588impl<H: BlockHandler> RecoverableTask for FinalizedBlockFetcherTask<H> {
589 async fn recover_from_error(&mut self, _error: &BridgeError) -> Result<(), BridgeError> {
590 Ok(())
593 }
594}
595
596#[cfg(test)]
597mod tests {
598 use crate::bitcoin_syncer::BitcoinSyncer;
599 use crate::builder::transaction::DEFAULT_SEQUENCE;
600 use crate::task::{IntoTask, TaskExt};
601 use crate::{database::Database, test::common::*};
602 use bitcoin::absolute::Height;
603 use bitcoin::hashes::Hash;
604 use bitcoin::transaction::Version;
605 use bitcoin::{OutPoint, ScriptBuf, Transaction, TxIn, Witness};
606 use bitcoincore_rpc::RpcApi;
607
608 #[tokio::test]
609 async fn get_block_info_from_height() {
610 let mut config = create_test_config_with_thread_name().await;
611 let regtest = create_regtest_rpc(&mut config).await;
612 let rpc = regtest.rpc().clone();
613
614 rpc.mine_blocks(1).await.unwrap();
615 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
616 let hash = rpc.get_block_hash(height as u64).await.unwrap();
617 let header = rpc.get_block_header(&hash).await.unwrap();
618
619 let block_info = super::fetch_block_info_from_height(&rpc, height)
620 .await
621 .unwrap();
622 assert_eq!(block_info._header, header);
623 assert_eq!(block_info.hash, hash);
624 assert_eq!(block_info.height, height);
625
626 rpc.mine_blocks(1).await.unwrap();
627 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
628
629 let block_info = super::fetch_block_info_from_height(&rpc, height)
630 .await
631 .unwrap();
632 assert_ne!(block_info._header, header);
633 assert_ne!(block_info.hash, hash);
634 assert_eq!(block_info.height, height);
635 }
636
637 #[tokio::test]
638 async fn save_get_transaction_spent_utxos() {
639 let mut config = create_test_config_with_thread_name().await;
640 let db = Database::new(&config).await.unwrap();
641 let regtest = create_regtest_rpc(&mut config).await;
642 let rpc = regtest.rpc().clone();
643
644 let mut dbtx = db.begin_transaction().await.unwrap();
645
646 rpc.mine_blocks(1).await.unwrap();
647 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
648 let hash = rpc.get_block_hash(height as u64).await.unwrap();
649 let block = rpc.get_block(&hash).await.unwrap();
650 let block_id = super::save_block(&db, &mut dbtx, &block, height)
651 .await
652 .unwrap();
653
654 let inputs = vec![
655 TxIn {
656 previous_output: OutPoint {
657 txid: bitcoin::Txid::all_zeros(),
658 vout: 0,
659 },
660 script_sig: ScriptBuf::default(),
661 sequence: DEFAULT_SEQUENCE,
662 witness: Witness::default(),
663 },
664 TxIn {
665 previous_output: OutPoint {
666 txid: bitcoin::Txid::all_zeros(),
667 vout: 1,
668 },
669 script_sig: ScriptBuf::default(),
670 sequence: DEFAULT_SEQUENCE,
671 witness: Witness::default(),
672 },
673 ];
674 let tx = Transaction {
675 version: Version::TWO,
676 lock_time: bitcoin::absolute::LockTime::Blocks(Height::ZERO),
677 input: inputs.clone(),
678 output: vec![],
679 };
680 super::save_transaction_spent_utxos(&db, &mut dbtx, &tx, block_id)
681 .await
682 .unwrap();
683
684 let utxos = super::_get_transaction_spent_utxos(&db, &mut dbtx, tx.compute_txid())
685 .await
686 .unwrap();
687
688 for (index, input) in inputs.iter().enumerate() {
689 assert_eq!(input.previous_output, utxos[index]);
690 }
691
692 dbtx.commit().await.unwrap();
693 }
694
695 #[tokio::test]
696 async fn save_get_block() {
697 let mut config = create_test_config_with_thread_name().await;
698 let db = Database::new(&config).await.unwrap();
699 let regtest = create_regtest_rpc(&mut config).await;
700 let rpc = regtest.rpc().clone();
701
702 let mut dbtx = db.begin_transaction().await.unwrap();
703
704 rpc.mine_blocks(1).await.unwrap();
705 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
706 let hash = rpc.get_block_hash(height as u64).await.unwrap();
707 let block = rpc.get_block(&hash).await.unwrap();
708
709 super::save_block(&db, &mut dbtx, &block, height)
710 .await
711 .unwrap();
712
713 let (block_info, utxos) = super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
714 .await
715 .unwrap();
716 assert_eq!(block_info._header, block.header);
717 assert_eq!(block_info.hash, hash);
718 assert_eq!(block_info.height, height);
719 for (tx_index, tx) in block.txdata.iter().enumerate() {
720 for (txin_index, txin) in tx.input.iter().enumerate() {
721 assert_eq!(txin.previous_output, utxos[tx_index][txin_index]);
722 }
723 }
724
725 dbtx.commit().await.unwrap();
726 }
727
728 #[tokio::test]
729 async fn set_initial_block_info_if_not_exists() {
730 let mut config = create_test_config_with_thread_name().await;
731 let db = Database::new(&config).await.unwrap();
732 let regtest = create_regtest_rpc(&mut config).await;
733 let rpc = regtest.rpc().clone();
734
735 let mut dbtx = db.begin_transaction().await.unwrap();
736
737 rpc.mine_blocks(1).await.unwrap();
738 let hash = rpc
740 .get_block_hash(config.protocol_paramset().start_height as u64)
741 .await
742 .unwrap();
743 let block = rpc.get_block(&hash).await.unwrap();
744
745 assert!(super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
746 .await
747 .is_err());
748
749 super::set_initial_block_info_if_not_exists(&db, &rpc, config.protocol_paramset())
750 .await
751 .unwrap();
752
753 let (block_info, utxos) = super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash)
754 .await
755 .unwrap();
756 assert_eq!(block_info.hash, hash);
757 assert_eq!(block_info.height, config.protocol_paramset().start_height);
758
759 for (tx_index, tx) in block.txdata.iter().enumerate() {
760 for (txin_index, txin) in tx.input.iter().enumerate() {
761 assert_eq!(txin.previous_output, utxos[tx_index][txin_index]);
762 }
763 }
764 }
765
766 #[tokio::test]
767 async fn fetch_new_blocks_forward() {
768 let mut config = create_test_config_with_thread_name().await;
769 let db = Database::new(&config).await.unwrap();
770 let regtest = create_regtest_rpc(&mut config).await;
771 let rpc = regtest.rpc().clone();
772
773 let mut dbtx = db.begin_transaction().await.unwrap();
774
775 rpc.mine_blocks(1).await.unwrap();
776 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
777 let hash = rpc.get_block_hash(height as u64).await.unwrap();
778 let block = rpc.get_block(&hash).await.unwrap();
779 super::save_block(&db, &mut dbtx, &block, height)
780 .await
781 .unwrap();
782 dbtx.commit().await.unwrap();
783
784 let new_blocks =
785 super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
786 .await
787 .unwrap();
788 assert!(new_blocks.is_none());
789
790 let new_block_hashes = rpc.mine_blocks(1).await.unwrap();
791 let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
792 let new_blocks =
793 super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
794 .await
795 .unwrap()
796 .unwrap();
797 assert_eq!(new_blocks.len(), 1);
798 assert_eq!(new_blocks.first().unwrap().height, new_height);
799 assert_eq!(
800 new_blocks.first().unwrap().hash,
801 *new_block_hashes.first().unwrap()
802 );
803 }
804
805 #[tokio::test]
806 async fn fetch_new_blocks_backwards() {
807 let mut config = create_test_config_with_thread_name().await;
808 let db = Database::new(&config).await.unwrap();
809 let regtest = create_regtest_rpc(&mut config).await;
810 let rpc = regtest.rpc().clone();
811
812 rpc.mine_blocks(1).await.unwrap();
814 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
815 let hash = rpc.get_block_hash(height as u64).await.unwrap();
816 let block = rpc.get_block(&hash).await.unwrap();
817
818 let mut dbtx = db.begin_transaction().await.unwrap();
820 super::save_block(&db, &mut dbtx, &block, height)
821 .await
822 .unwrap();
823 dbtx.commit().await.unwrap();
824
825 let new_blocks =
826 super::fetch_new_blocks(&db, &rpc, height, config.protocol_paramset().finality_depth)
827 .await
828 .unwrap();
829 assert!(new_blocks.is_none());
830
831 let mine_count: u32 = config.protocol_paramset().finality_depth - 1;
833 let new_block_hashes = rpc.mine_blocks(mine_count as u64).await.unwrap();
834 let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
835
836 let new_blocks = super::fetch_new_blocks(
837 &db,
838 &rpc,
839 new_height - 1,
840 config.protocol_paramset().finality_depth,
841 )
842 .await
843 .unwrap()
844 .unwrap();
845 assert_eq!(new_blocks.len(), mine_count as usize);
846 for (index, block) in new_blocks.iter().enumerate() {
847 assert_eq!(block.height, new_height - mine_count + index as u32 + 1);
848 assert_eq!(block.hash, new_block_hashes[index]);
849 }
850
851 let mine_count: u32 = config.protocol_paramset().finality_depth;
853 rpc.mine_blocks(mine_count as u64).await.unwrap();
854 let new_height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
855
856 assert!(super::fetch_new_blocks(
857 &db,
858 &rpc,
859 new_height - 1,
860 config.protocol_paramset().finality_depth
861 )
862 .await
863 .is_err());
864 }
865 #[ignore]
866 #[tokio::test]
867 async fn set_non_canonical_block_hashes() {
868 let mut config = create_test_config_with_thread_name().await;
869 let db = Database::new(&config).await.unwrap();
870 let regtest = create_regtest_rpc(&mut config).await;
871 let rpc = regtest.rpc().clone();
872
873 let hashes = rpc.mine_blocks(4).await.unwrap();
874 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
875
876 super::set_initial_block_info_if_not_exists(&db, &rpc, config.protocol_paramset())
877 .await
878 .unwrap();
879
880 rpc.invalidate_block(hashes.get(3).unwrap()).await.unwrap();
881 rpc.invalidate_block(hashes.get(2).unwrap()).await.unwrap();
882
883 let mut dbtx = db.begin_transaction().await.unwrap();
884
885 let last_db_block =
886 super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, *hashes.get(3).unwrap())
887 .await
888 .unwrap();
889 assert_eq!(last_db_block.0.height, height);
890 assert_eq!(last_db_block.0.hash, *hashes.get(3).unwrap());
891
892 super::handle_reorg_events(&db, &mut dbtx, height - 2)
893 .await
894 .unwrap();
895
896 assert!(
897 super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, *hashes.get(3).unwrap())
898 .await
899 .is_err()
900 );
901
902 dbtx.commit().await.unwrap();
903 }
904
905 #[tokio::test]
906 async fn start_bitcoin_syncer_new_block_mined() {
907 let mut config = create_test_config_with_thread_name().await;
908 let db = Database::new(&config).await.unwrap();
909 let regtest = create_regtest_rpc(&mut config).await;
910 let rpc = regtest.rpc().clone();
911
912 rpc.mine_blocks(1).await.unwrap();
913 let height = u32::try_from(rpc.get_block_count().await.unwrap()).unwrap();
914 let hash = rpc.get_block_hash(height as u64).await.unwrap();
915
916 let (looping_task, _cancel_tx) =
917 BitcoinSyncer::new(db.clone(), rpc.clone(), config.protocol_paramset())
918 .await
919 .unwrap()
920 .into_task()
921 .cancelable_loop();
922
923 looping_task.into_bg();
924
925 loop {
926 let mut dbtx = db.begin_transaction().await.unwrap();
927
928 let last_db_block =
929 match super::_get_block_info_from_hash(&db, &mut dbtx, &rpc, hash).await {
930 Ok(block) => block,
931 Err(_) => {
932 dbtx.commit().await.unwrap();
933 continue;
934 }
935 };
936
937 assert_eq!(last_db_block.0.height, height);
938 assert_eq!(last_db_block.0.hash, hash);
939
940 dbtx.commit().await.unwrap();
941 break;
942 }
943 }
944}