1use super::{
2 wrapper::{BlockHashDB, TxidDB},
3 Database, DatabaseTransaction,
4};
5use crate::{
6 bitcoin_syncer::BitcoinSyncerEvent, config::protocol::ProtocolParamset, errors::BridgeError,
7 execute_query_with_tx,
8};
9use bitcoin::{BlockHash, OutPoint, Txid};
10use eyre::Context;
11use std::ops::DerefMut;
12
13impl Database {
14 pub async fn insert_block_info(
18 &self,
19 tx: Option<DatabaseTransaction<'_, '_>>,
20 block_hash: &BlockHash,
21 prev_block_hash: &BlockHash,
22 block_height: u32,
23 ) -> Result<u32, BridgeError> {
24 let query = sqlx::query_scalar(
25 "INSERT INTO bitcoin_syncer (blockhash, prev_blockhash, height) VALUES ($1, $2, $3) RETURNING id",
26 )
27 .bind(BlockHashDB(*block_hash))
28 .bind(BlockHashDB(*prev_block_hash))
29 .bind(i32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?);
30
31 let id: i32 = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
32
33 u32::try_from(id)
34 .wrap_err(BridgeError::IntConversionError)
35 .map_err(Into::into)
36 }
37
38 pub async fn update_block_as_canonical(
41 &self,
42 tx: Option<DatabaseTransaction<'_, '_>>,
43 block_hash: BlockHash,
44 ) -> Result<Option<u32>, BridgeError> {
45 let query = sqlx::query_scalar(
46 "UPDATE bitcoin_syncer SET is_canonical = true WHERE blockhash = $1 RETURNING id",
47 )
48 .bind(BlockHashDB(block_hash));
49
50 let id: Option<i32> = execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
51
52 id.map(|id| u32::try_from(id).wrap_err(BridgeError::IntConversionError))
53 .transpose()
54 .map_err(Into::into)
55 }
56
57 pub async fn get_block_info_from_hash(
64 &self,
65 tx: Option<DatabaseTransaction<'_, '_>>,
66 block_hash: BlockHash,
67 ) -> Result<Option<(BlockHash, u32)>, BridgeError> {
68 let query = sqlx::query_as(
69 "SELECT prev_blockhash, height FROM bitcoin_syncer WHERE blockhash = $1 AND is_canonical = true",
70 )
71 .bind(BlockHashDB(block_hash));
72
73 let ret: Option<(BlockHashDB, i32)> =
74 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
75
76 ret.map(
77 |(prev_hash, height)| -> Result<(BlockHash, u32), BridgeError> {
78 let height = u32::try_from(height).wrap_err(BridgeError::IntConversionError)?;
79 Ok((prev_hash.0, height))
80 },
81 )
82 .transpose()
83 }
84
85 pub async fn get_block_info_from_id(
87 &self,
88 tx: Option<DatabaseTransaction<'_, '_>>,
89 block_id: u32,
90 ) -> Result<Option<(BlockHash, u32)>, BridgeError> {
91 let query = sqlx::query_as("SELECT blockhash, height FROM bitcoin_syncer WHERE id = $1")
92 .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?);
93
94 let ret: Option<(BlockHashDB, i32)> =
95 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
96
97 ret.map(
98 |(block_hash, height)| -> Result<(BlockHash, u32), BridgeError> {
99 let height = u32::try_from(height).wrap_err(BridgeError::IntConversionError)?;
100 Ok((block_hash.0, height))
101 },
102 )
103 .transpose()
104 }
105
106 pub async fn upsert_full_block(
108 &self,
109 tx: Option<DatabaseTransaction<'_, '_>>,
110 block: &bitcoin::Block,
111 block_height: u32,
112 ) -> Result<(), BridgeError> {
113 let block_bytes = bitcoin::consensus::serialize(block);
114 let query = sqlx::query(
115 "INSERT INTO bitcoin_blocks (height, block_data, block_hash) VALUES ($1, $2, $3)
116 ON CONFLICT (height) DO UPDATE SET block_data = $2, block_hash = $3",
117 )
118 .bind(i32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?)
119 .bind(&block_bytes)
120 .bind(BlockHashDB(block.header.block_hash()));
121
122 execute_query_with_tx!(self.connection, tx, query, execute)?;
123 Ok(())
124 }
125
126 pub async fn get_full_block(
128 &self,
129 tx: Option<DatabaseTransaction<'_, '_>>,
130 block_height: u32,
131 ) -> Result<Option<bitcoin::Block>, BridgeError> {
132 let query = sqlx::query_as("SELECT block_data FROM bitcoin_blocks WHERE height = $1")
133 .bind(i32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?);
134
135 let block_data: Option<(Vec<u8>,)> =
136 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
137
138 match block_data {
139 Some((bytes,)) => {
140 let block = bitcoin::consensus::deserialize(&bytes)
141 .wrap_err(BridgeError::IntConversionError)?;
142 Ok(Some(block))
143 }
144 None => Ok(None),
145 }
146 }
147
148 pub async fn get_full_block_from_hash(
150 &self,
151 tx: Option<DatabaseTransaction<'_, '_>>,
152 block_hash: BlockHash,
153 ) -> Result<Option<(u32, bitcoin::Block)>, BridgeError> {
154 let query =
155 sqlx::query_as("SELECT height, block_data FROM bitcoin_blocks WHERE block_hash = $1")
156 .bind(BlockHashDB(block_hash));
157
158 let block_data: Option<(i32, Vec<u8>)> =
159 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
160
161 match block_data {
162 Some((height_i32, bytes)) => {
163 let height = u32::try_from(height_i32).wrap_err(BridgeError::IntConversionError)?;
164 let block = bitcoin::consensus::deserialize(&bytes)
165 .wrap_err(BridgeError::IntConversionError)?;
166 Ok(Some((height, block)))
167 }
168 None => Ok(None),
169 }
170 }
171
172 pub async fn get_max_height(
174 &self,
175 tx: Option<DatabaseTransaction<'_, '_>>,
176 ) -> Result<Option<u32>, BridgeError> {
177 let query =
178 sqlx::query_as("SELECT height FROM bitcoin_syncer WHERE is_canonical = true ORDER BY height DESC LIMIT 1");
179 let result: Option<(i32,)> =
180 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
181
182 result
183 .map(|(height,)| u32::try_from(height).wrap_err(BridgeError::IntConversionError))
184 .transpose()
185 .map_err(Into::into)
186 }
187
188 pub async fn update_non_canonical_block_hashes(
201 &self,
202 tx: Option<DatabaseTransaction<'_, '_>>,
203 height: u32,
204 ) -> Result<Vec<u32>, BridgeError> {
205 let query = sqlx::query_as(
206 "WITH deleted AS (
207 UPDATE bitcoin_syncer
208 SET is_canonical = false
209 WHERE height > $1
210 RETURNING id
211 ) SELECT id FROM deleted ORDER BY id DESC",
212 )
213 .bind(i32::try_from(height).wrap_err(BridgeError::IntConversionError)?);
214
215 let block_ids: Vec<(i32,)> = execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
216 block_ids
217 .into_iter()
218 .map(|(block_id,)| u32::try_from(block_id).wrap_err(BridgeError::IntConversionError))
219 .collect::<Result<Vec<_>, eyre::Report>>()
220 .map_err(Into::into)
221 }
222
223 pub async fn get_canonical_block_id_from_height(
225 &self,
226 tx: Option<DatabaseTransaction<'_, '_>>,
227 height: u32,
228 ) -> Result<Option<u32>, BridgeError> {
229 let query = sqlx::query_as(
230 "SELECT id FROM bitcoin_syncer WHERE height = $1 AND is_canonical = true",
231 )
232 .bind(i32::try_from(height).wrap_err(BridgeError::IntConversionError)?);
233
234 let block_id: Option<(i32,)> =
235 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
236
237 block_id
238 .map(|(block_id,)| u32::try_from(block_id).wrap_err(BridgeError::IntConversionError))
239 .transpose()
240 .map_err(Into::into)
241 }
242
243 pub async fn insert_txid_to_block(
245 &self,
246 tx: DatabaseTransaction<'_, '_>,
247 block_id: u32,
248 txid: &bitcoin::Txid,
249 ) -> Result<(), BridgeError> {
250 let query = sqlx::query("INSERT INTO bitcoin_syncer_txs (block_id, txid) VALUES ($1, $2)")
251 .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?)
252 .bind(super::wrapper::TxidDB(*txid));
253
254 execute_query_with_tx!(self.connection, Some(tx), query, execute)?;
255
256 Ok(())
257 }
258
259 pub async fn get_block_txids(
261 &self,
262 tx: Option<DatabaseTransaction<'_, '_>>,
263 block_id: u32,
264 ) -> Result<Vec<Txid>, BridgeError> {
265 let query = sqlx::query_as("SELECT txid FROM bitcoin_syncer_txs WHERE block_id = $1")
266 .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?);
267
268 let txids: Vec<(TxidDB,)> = execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
269
270 Ok(txids.into_iter().map(|(txid,)| txid.0).collect())
271 }
272
273 pub async fn insert_spent_utxo(
275 &self,
276 tx: DatabaseTransaction<'_, '_>,
277 block_id: u32,
278 spending_txid: &bitcoin::Txid,
279 txid: &bitcoin::Txid,
280 vout: i64,
281 ) -> Result<(), BridgeError> {
282 sqlx::query(
283 "INSERT INTO bitcoin_syncer_spent_utxos (block_id, spending_txid, txid, vout) VALUES ($1, $2, $3, $4)",
284 )
285 .bind(block_id as i32)
286 .bind(super::wrapper::TxidDB(*spending_txid))
287 .bind(super::wrapper::TxidDB(*txid))
288 .bind(vout)
289 .execute(tx.deref_mut())
290 .await?;
291 Ok(())
292 }
293
294 pub async fn get_block_height_of_spending_txid(
297 &self,
298 tx: Option<DatabaseTransaction<'_, '_>>,
299 outpoint: OutPoint,
300 ) -> Result<Option<u32>, BridgeError> {
301 let query = sqlx::query_scalar::<_, i32>(
302 "SELECT bs.height FROM bitcoin_syncer_spent_utxos bspu
303 INNER JOIN bitcoin_syncer bs ON bspu.block_id = bs.id
304 WHERE bspu.txid = $1 AND bspu.vout = $2 AND bs.is_canonical = true",
305 )
306 .bind(super::wrapper::TxidDB(outpoint.txid))
307 .bind(outpoint.vout as i64);
308
309 let result: Option<i32> =
310 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
311
312 result
313 .map(|height| u32::try_from(height).wrap_err(BridgeError::IntConversionError))
314 .transpose()
315 .map_err(Into::into)
316 }
317
318 pub async fn check_if_utxo_spending_tx_is_finalized(
321 &self,
322 tx: Option<DatabaseTransaction<'_, '_>>,
323 outpoint: OutPoint,
324 current_chain_height: u32,
325 protocol_paramset: &'static ProtocolParamset,
326 ) -> Result<bool, BridgeError> {
327 let spending_tx_height = self.get_block_height_of_spending_txid(tx, outpoint).await?;
328 match spending_tx_height {
329 Some(spending_tx_height) => {
330 Ok(protocol_paramset.is_block_finalized(spending_tx_height, current_chain_height))
331 }
332 None => Ok(false),
333 }
334 }
335
336 pub async fn get_spent_utxos_for_txid(
338 &self,
339 tx: Option<DatabaseTransaction<'_, '_>>,
340 txid: Txid,
341 ) -> Result<Vec<(i64, OutPoint)>, BridgeError> {
342 let query = sqlx::query_as(
343 "SELECT block_id, txid, vout FROM bitcoin_syncer_spent_utxos WHERE spending_txid = $1",
344 )
345 .bind(TxidDB(txid));
346
347 let spent_utxos: Vec<(i64, TxidDB, i64)> =
348 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
349
350 spent_utxos
351 .into_iter()
352 .map(
353 |(block_id, txid, vout)| -> Result<(i64, OutPoint), BridgeError> {
354 let vout = u32::try_from(vout).wrap_err(BridgeError::IntConversionError)?;
355 Ok((block_id, OutPoint { txid: txid.0, vout }))
356 },
357 )
358 .collect::<Result<Vec<_>, BridgeError>>()
359 }
360
361 pub async fn insert_event(
363 &self,
364 tx: Option<DatabaseTransaction<'_, '_>>,
365 event_type: BitcoinSyncerEvent,
366 ) -> Result<(), BridgeError> {
367 let query = match event_type {
368 BitcoinSyncerEvent::NewBlock(block_id) => sqlx::query(
369 "INSERT INTO bitcoin_syncer_events (block_id, event_type) VALUES ($1, 'new_block'::bitcoin_syncer_event_type)",
370 )
371 .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?),
372 BitcoinSyncerEvent::ReorgedBlock(block_id) => sqlx::query(
373 "INSERT INTO bitcoin_syncer_events (block_id, event_type) VALUES ($1, 'reorged_block'::bitcoin_syncer_event_type)",
374 )
375 .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?),
376 };
377 execute_query_with_tx!(self.connection, tx, query, execute)?;
378 Ok(())
379 }
380
381 pub async fn get_last_processed_event_block_height(
384 &self,
385 tx: Option<DatabaseTransaction<'_, '_>>,
386 consumer_handle: &str,
387 ) -> Result<Option<u32>, BridgeError> {
388 let query = sqlx::query_scalar::<_, i32>(
389 r#"SELECT bs.height
390 FROM bitcoin_syncer_event_handlers bseh
391 INNER JOIN bitcoin_syncer_events bse ON bseh.last_processed_event_id = bse.id
392 INNER JOIN bitcoin_syncer bs ON bse.block_id = bs.id
393 WHERE bseh.consumer_handle = $1"#,
394 )
395 .bind(consumer_handle);
396
397 let result: Option<i32> =
398 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
399
400 result
401 .map(|h| {
402 u32::try_from(h)
403 .wrap_err(BridgeError::IntConversionError)
404 .map_err(BridgeError::from)
405 })
406 .transpose()
407 }
408
409 pub async fn get_last_processed_event_id(
411 &self,
412 tx: DatabaseTransaction<'_, '_>,
413 consumer_handle: &str,
414 ) -> Result<i32, BridgeError> {
415 sqlx::query(
417 r#"
418 INSERT INTO bitcoin_syncer_event_handlers (consumer_handle, last_processed_event_id)
419 VALUES ($1, 0)
420 ON CONFLICT (consumer_handle) DO NOTHING
421 "#,
422 )
423 .bind(consumer_handle)
424 .execute(tx.deref_mut())
425 .await?;
426
427 let last_processed_event_id: i32 = sqlx::query_scalar(
429 r#"
430 SELECT last_processed_event_id
431 FROM bitcoin_syncer_event_handlers
432 WHERE consumer_handle = $1
433 "#,
434 )
435 .bind(consumer_handle)
436 .fetch_one(tx.deref_mut())
437 .await?;
438
439 Ok(last_processed_event_id)
440 }
441
442 pub async fn get_max_processed_block_height(
445 &self,
446 tx: Option<DatabaseTransaction<'_, '_>>,
447 consumer_handle: &str,
448 ) -> Result<Option<u32>, BridgeError> {
449 let query = sqlx::query_scalar::<_, Option<i32>>(
450 r#"SELECT MAX(bs.height)
451 FROM bitcoin_syncer_events bse
452 INNER JOIN bitcoin_syncer bs ON bse.block_id = bs.id
453 WHERE bse.id <= (
454 SELECT last_processed_event_id
455 FROM bitcoin_syncer_event_handlers
456 WHERE consumer_handle = $1
457 )"#,
458 )
459 .bind(consumer_handle);
460
461 let result: Option<i32> = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
462
463 result
464 .map(|h| {
465 u32::try_from(h)
466 .wrap_err(BridgeError::IntConversionError)
467 .map_err(BridgeError::from)
468 })
469 .transpose()
470 }
471
472 pub async fn get_next_finalized_block_height_for_consumer(
476 &self,
477 tx: Option<DatabaseTransaction<'_, '_>>,
478 consumer_handle: &str,
479 paramset: &'static ProtocolParamset,
480 ) -> Result<u32, BridgeError> {
481 let max_processed_block_height = self
482 .get_max_processed_block_height(tx, consumer_handle)
483 .await?;
484
485 let max_processed_finalized_block_height = match max_processed_block_height {
486 Some(max_processed_block_height) => {
487 max_processed_block_height.checked_sub(paramset.finality_depth - 1)
488 }
489 None => None,
490 };
491
492 let next_height = max_processed_finalized_block_height
493 .map(|h| h + 1)
494 .unwrap_or(paramset.start_height);
495
496 Ok(std::cmp::max(next_height, paramset.start_height))
497 }
498
499 pub async fn fetch_next_bitcoin_syncer_evt(
505 &self,
506 tx: DatabaseTransaction<'_, '_>,
507 consumer_handle: &str,
508 ) -> Result<Option<BitcoinSyncerEvent>, BridgeError> {
509 let last_processed_event_id = self
511 .get_last_processed_event_id(tx, consumer_handle)
512 .await?;
513
514 let event = sqlx::query_as::<_, (i32, i32, String)>(
516 r#"
517 SELECT id, block_id, event_type::text
518 FROM bitcoin_syncer_events
519 WHERE id > $1
520 ORDER BY id ASC
521 LIMIT 1
522 "#,
523 )
524 .bind(last_processed_event_id)
525 .fetch_optional(tx.deref_mut())
526 .await?;
527
528 if event.is_none() {
529 return Ok(None);
530 }
531
532 let event = event.expect("should exist since we checked is_none()");
533 let event_id = event.0;
534 let event_type: BitcoinSyncerEvent = (event.2, event.1).try_into()?;
535
536 sqlx::query(
538 r#"
539 UPDATE bitcoin_syncer_event_handlers
540 SET last_processed_event_id = $1
541 WHERE consumer_handle = $2
542 "#,
543 )
544 .bind(event_id)
545 .bind(consumer_handle)
546 .execute(tx.deref_mut())
547 .await?;
548
549 Ok(Some(event_type))
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556 use crate::database::Database;
557 use crate::test::common::*;
558 use bitcoin::hashes::Hash;
559 use bitcoin::{BlockHash, CompactTarget};
560
561 async fn setup_test_db() -> Database {
562 let config = create_test_config_with_thread_name().await;
563 Database::new(&config).await.unwrap()
564 }
565
566 #[tokio::test]
567 async fn test_event_handling() {
568 let db = setup_test_db().await;
569 let mut dbtx = db.begin_transaction().await.unwrap();
570
571 let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
573 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
574 let height = 0x45;
575
576 let block_id = db
577 .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
578 .await
579 .unwrap();
580
581 db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
583 .await
584 .unwrap();
585
586 let consumer_handle = "test_consumer";
588 let event = db
589 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer_handle)
590 .await
591 .unwrap();
592
593 assert!(matches!(event, Some(BitcoinSyncerEvent::NewBlock(id)) if id == block_id));
594
595 let event = db
597 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer_handle)
598 .await
599 .unwrap();
600 assert!(event.is_none());
601
602 db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::ReorgedBlock(block_id))
604 .await
605 .unwrap();
606
607 let event = db
609 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer_handle)
610 .await
611 .unwrap();
612 assert!(matches!(event, Some(BitcoinSyncerEvent::ReorgedBlock(id)) if id == block_id));
613
614 dbtx.commit().await.unwrap();
615 }
616
617 #[tokio::test]
618 async fn test_store_and_get_block() {
619 let db = setup_test_db().await;
620 let block_height = 123u32;
621
622 let dummy_header = bitcoin::block::Header {
624 version: bitcoin::block::Version::TWO,
625 prev_blockhash: BlockHash::from_raw_hash(Hash::from_byte_array([0x42; 32])),
626 merkle_root: bitcoin::TxMerkleNode::all_zeros(),
627 time: 1_000_000,
628 bits: CompactTarget::from_consensus(0),
629 nonce: 12345,
630 };
631
632 let dummy_txs = vec![bitcoin::Transaction {
633 version: bitcoin::blockdata::transaction::Version::TWO,
634 lock_time: bitcoin::absolute::LockTime::ZERO,
635 input: vec![],
636 output: vec![],
637 }];
638
639 let dummy_block = bitcoin::Block {
640 header: dummy_header,
641 txdata: dummy_txs.clone(),
642 };
643
644 let dummy_block_hash = dummy_block.block_hash();
645
646 db.upsert_full_block(None, &dummy_block, block_height)
648 .await
649 .unwrap();
650
651 let retrieved_block = db
653 .get_full_block(None, block_height)
654 .await
655 .unwrap()
656 .unwrap();
657
658 assert_eq!(retrieved_block, dummy_block);
660
661 let retrieved_block_from_hash = db
663 .get_full_block_from_hash(None, dummy_block_hash)
664 .await
665 .unwrap()
666 .unwrap()
667 .1;
668
669 assert_eq!(retrieved_block_from_hash, dummy_block);
671
672 assert!(db.get_full_block(None, 999).await.unwrap().is_none());
674
675 let updated_dummy_header = bitcoin::block::Header {
677 version: bitcoin::block::Version::ONE, ..dummy_header
679 };
680 let updated_dummy_block = bitcoin::Block {
681 header: updated_dummy_header,
682 txdata: dummy_txs.clone(),
683 };
684
685 let updated_dummy_block_hash = updated_dummy_block.block_hash();
686
687 db.upsert_full_block(None, &updated_dummy_block, block_height)
688 .await
689 .unwrap();
690
691 let retrieved_updated_block = db
693 .get_full_block(None, block_height)
694 .await
695 .unwrap()
696 .unwrap();
697 assert_eq!(updated_dummy_block, retrieved_updated_block);
698
699 let retrieved_updated_block_from_hash = db
700 .get_full_block_from_hash(None, updated_dummy_block_hash)
701 .await
702 .unwrap()
703 .unwrap()
704 .1;
705 assert_eq!(updated_dummy_block, retrieved_updated_block_from_hash);
706 }
707
708 #[tokio::test]
709 async fn test_multiple_event_consumers() {
710 let db = setup_test_db().await;
711 let mut dbtx = db.begin_transaction().await.unwrap();
712
713 let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
715 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
716 let height = 0x45;
717
718 let block_id = db
719 .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
720 .await
721 .unwrap();
722
723 db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
725 .await
726 .unwrap();
727 db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::ReorgedBlock(block_id))
728 .await
729 .unwrap();
730
731 let consumer1 = "consumer1";
733 let consumer2 = "consumer2";
734
735 let event1 = db
737 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer1)
738 .await
739 .unwrap();
740 assert!(matches!(event1, Some(BitcoinSyncerEvent::NewBlock(id)) if id == block_id));
741
742 let event2 = db
743 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer1)
744 .await
745 .unwrap();
746 assert!(matches!(event2, Some(BitcoinSyncerEvent::ReorgedBlock(id)) if id == block_id));
747
748 let event1 = db
750 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer2)
751 .await
752 .unwrap();
753 assert!(matches!(event1, Some(BitcoinSyncerEvent::NewBlock(id)) if id == block_id));
754
755 let event2 = db
756 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer2)
757 .await
758 .unwrap();
759 assert!(matches!(event2, Some(BitcoinSyncerEvent::ReorgedBlock(id)) if id == block_id));
760
761 dbtx.commit().await.unwrap();
762 }
763
764 #[tokio::test]
765 async fn test_non_canonical_blocks() {
766 let db = setup_test_db().await;
767 let mut dbtx = db.begin_transaction().await.unwrap();
768
769 let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
771 let heights = [1, 2, 3, 4, 5];
772 let mut last_hash = prev_block_hash;
773
774 let mut block_ids = Vec::new();
776 for height in heights {
777 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([height as u8; 32]));
778 let block_id = db
779 .insert_block_info(Some(&mut dbtx), &block_hash, &last_hash, height)
780 .await
781 .unwrap();
782 block_ids.push(block_id);
783 last_hash = block_hash;
784 }
785
786 let non_canonical_blocks = db
788 .update_non_canonical_block_hashes(Some(&mut dbtx), 2)
789 .await
790 .unwrap();
791 assert_eq!(non_canonical_blocks.len(), 3);
792 assert_eq!(non_canonical_blocks, vec![5, 4, 3]);
793
794 for height in heights {
796 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([height as u8; 32]));
797 let block_info = db
798 .get_block_info_from_hash(Some(&mut dbtx), block_hash)
799 .await
800 .unwrap();
801
802 if height <= 2 {
803 assert!(block_info.is_some());
804 } else {
805 assert!(block_info.is_none());
806 }
807 }
808
809 let max_height = db.get_max_height(Some(&mut dbtx)).await.unwrap().unwrap();
811 assert_eq!(max_height, 2);
812
813 dbtx.commit().await.unwrap();
814 }
815
816 #[tokio::test]
817 async fn add_get_block_info() {
818 let config = create_test_config_with_thread_name().await;
819 let db = Database::new(&config).await.unwrap();
820
821 let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
822 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
823 let height = 0x45;
824
825 assert!(db
826 .get_block_info_from_hash(None, block_hash)
827 .await
828 .unwrap()
829 .is_none());
830
831 db.insert_block_info(None, &block_hash, &prev_block_hash, height)
832 .await
833 .unwrap();
834 let block_info = db
835 .get_block_info_from_hash(None, block_hash)
836 .await
837 .unwrap()
838 .unwrap();
839 let max_height = db.get_max_height(None).await.unwrap().unwrap();
840 assert_eq!(block_info.0, prev_block_hash);
841 assert_eq!(block_info.1, height);
842 assert_eq!(max_height, height);
843
844 db.insert_block_info(
845 None,
846 &BlockHash::from_raw_hash(Hash::from_byte_array([0x1; 32])),
847 &prev_block_hash,
848 height - 1,
849 )
850 .await
851 .unwrap();
852 let max_height = db.get_max_height(None).await.unwrap().unwrap();
853 assert_eq!(max_height, height);
854
855 db.insert_block_info(
856 None,
857 &BlockHash::from_raw_hash(Hash::from_byte_array([0x2; 32])),
858 &prev_block_hash,
859 height + 1,
860 )
861 .await
862 .unwrap();
863 let max_height = db.get_max_height(None).await.unwrap().unwrap();
864 assert_ne!(max_height, height);
865 assert_eq!(max_height, height + 1);
866 }
867
868 #[tokio::test]
869 async fn add_and_get_txids_from_block() {
870 let config = create_test_config_with_thread_name().await;
871 let db = Database::new(&config).await.unwrap();
872 let mut dbtx = db.begin_transaction().await.unwrap();
873
874 assert!(db
875 .insert_txid_to_block(&mut dbtx, 0, &Txid::all_zeros())
876 .await
877 .is_err());
878 let mut dbtx = db.begin_transaction().await.unwrap();
879
880 let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
881 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
882 let height = 0x45;
883 let block_id = db
884 .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
885 .await
886 .unwrap();
887
888 let txids = vec![
889 Txid::from_raw_hash(Hash::from_byte_array([0x1; 32])),
890 Txid::from_raw_hash(Hash::from_byte_array([0x2; 32])),
891 Txid::from_raw_hash(Hash::from_byte_array([0x3; 32])),
892 ];
893 for txid in &txids {
894 db.insert_txid_to_block(&mut dbtx, block_id, txid)
895 .await
896 .unwrap();
897 }
898
899 let txids_from_db = db.get_block_txids(Some(&mut dbtx), block_id).await.unwrap();
900 assert_eq!(txids_from_db, txids);
901
902 assert!(db
903 .get_block_txids(Some(&mut dbtx), block_id + 1)
904 .await
905 .unwrap()
906 .is_empty());
907
908 dbtx.commit().await.unwrap();
909 }
910
911 #[tokio::test]
912 async fn insert_get_spent_utxos() {
913 let config = create_test_config_with_thread_name().await;
914 let db = Database::new(&config).await.unwrap();
915 let mut dbtx = db.begin_transaction().await.unwrap();
916
917 let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
918 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
919 let height = 0x45;
920 let block_id = db
921 .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
922 .await
923 .unwrap();
924
925 let spending_txid = Txid::from_raw_hash(Hash::from_byte_array([0x2; 32]));
926 let txid = Txid::from_raw_hash(Hash::from_byte_array([0x1; 32]));
927 let vout = 0;
928 db.insert_txid_to_block(&mut dbtx, block_id, &spending_txid)
929 .await
930 .unwrap();
931
932 assert_eq!(
933 db.get_spent_utxos_for_txid(Some(&mut dbtx), txid)
934 .await
935 .unwrap()
936 .len(),
937 0
938 );
939
940 db.insert_spent_utxo(&mut dbtx, block_id, &spending_txid, &txid, vout)
941 .await
942 .unwrap();
943
944 let spent_utxos = db
945 .get_spent_utxos_for_txid(Some(&mut dbtx), spending_txid)
946 .await
947 .unwrap();
948 assert_eq!(spent_utxos.len(), 1);
949 assert_eq!(spent_utxos[0].0, block_id as i64);
950 assert_eq!(
951 spent_utxos[0].1,
952 bitcoin::OutPoint {
953 txid,
954 vout: vout as u32,
955 }
956 );
957
958 dbtx.commit().await.unwrap();
959 }
960}