1use super::{
2 wrapper::{BlockHashDB, TxidDB},
3 Database, DatabaseTransaction,
4};
5use crate::{
6 bitcoin_syncer::BitcoinSyncerEvent, config::protocol::ProtocolParamset, execute_query_with_tx,
7};
8use bitcoin::{BlockHash, OutPoint, Txid};
9use clementine_errors::BridgeError;
10use eyre::Context;
11use std::ops::DerefMut;
12
13impl Database {
14 pub async fn insert_block_info(
18 &self,
19 tx: Option<DatabaseTransaction<'_>>,
20 block_hash: &BlockHash,
21 prev_block_hash: &BlockHash,
22 block_height: u32,
23 ) -> Result<u32, BridgeError> {
24 let query = sqlx::query_scalar(
25 "INSERT INTO bitcoin_syncer (blockhash, prev_blockhash, height) VALUES ($1, $2, $3) RETURNING id",
26 )
27 .bind(BlockHashDB(*block_hash))
28 .bind(BlockHashDB(*prev_block_hash))
29 .bind(i32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?);
30
31 let id: i32 = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
32
33 u32::try_from(id)
34 .wrap_err(BridgeError::IntConversionError)
35 .map_err(Into::into)
36 }
37
38 pub async fn update_block_as_canonical(
41 &self,
42 tx: Option<DatabaseTransaction<'_>>,
43 block_hash: BlockHash,
44 ) -> Result<Option<u32>, BridgeError> {
45 let query = sqlx::query_scalar(
46 "UPDATE bitcoin_syncer SET is_canonical = true WHERE blockhash = $1 RETURNING id",
47 )
48 .bind(BlockHashDB(block_hash));
49
50 let id: Option<i32> = execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
51
52 id.map(|id| u32::try_from(id).wrap_err(BridgeError::IntConversionError))
53 .transpose()
54 .map_err(Into::into)
55 }
56
57 pub async fn get_block_info_from_hash(
64 &self,
65 tx: Option<DatabaseTransaction<'_>>,
66 block_hash: BlockHash,
67 ) -> Result<Option<(BlockHash, u32)>, BridgeError> {
68 let query = sqlx::query_as(
69 "SELECT prev_blockhash, height FROM bitcoin_syncer WHERE blockhash = $1 AND is_canonical = true",
70 )
71 .bind(BlockHashDB(block_hash));
72
73 let ret: Option<(BlockHashDB, i32)> =
74 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
75
76 ret.map(
77 |(prev_hash, height)| -> Result<(BlockHash, u32), BridgeError> {
78 let height = u32::try_from(height).wrap_err(BridgeError::IntConversionError)?;
79 Ok((prev_hash.0, height))
80 },
81 )
82 .transpose()
83 }
84
85 pub async fn get_block_info_from_id(
87 &self,
88 tx: Option<DatabaseTransaction<'_>>,
89 block_id: u32,
90 ) -> Result<Option<(BlockHash, u32)>, BridgeError> {
91 let query = sqlx::query_as("SELECT blockhash, height FROM bitcoin_syncer WHERE id = $1")
92 .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?);
93
94 let ret: Option<(BlockHashDB, i32)> =
95 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
96
97 ret.map(
98 |(block_hash, height)| -> Result<(BlockHash, u32), BridgeError> {
99 let height = u32::try_from(height).wrap_err(BridgeError::IntConversionError)?;
100 Ok((block_hash.0, height))
101 },
102 )
103 .transpose()
104 }
105
106 pub async fn upsert_full_block(
108 &self,
109 tx: Option<DatabaseTransaction<'_>>,
110 block: &bitcoin::Block,
111 block_height: u32,
112 ) -> Result<(), BridgeError> {
113 let block_bytes = bitcoin::consensus::serialize(block);
114 let query = sqlx::query(
115 "INSERT INTO bitcoin_blocks (height, block_data, block_hash) VALUES ($1, $2, $3)
116 ON CONFLICT (height) DO UPDATE SET block_data = $2, block_hash = $3",
117 )
118 .bind(i32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?)
119 .bind(&block_bytes)
120 .bind(BlockHashDB(block.header.block_hash()));
121
122 execute_query_with_tx!(self.connection, tx, query, execute)?;
123 Ok(())
124 }
125
126 pub async fn get_full_block(
128 &self,
129 tx: Option<DatabaseTransaction<'_>>,
130 block_height: u32,
131 ) -> Result<Option<bitcoin::Block>, BridgeError> {
132 let query = sqlx::query_as("SELECT block_data FROM bitcoin_blocks WHERE height = $1")
133 .bind(i32::try_from(block_height).wrap_err(BridgeError::IntConversionError)?);
134
135 let block_data: Option<(Vec<u8>,)> =
136 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
137
138 match block_data {
139 Some((bytes,)) => {
140 let block = bitcoin::consensus::deserialize(&bytes)
141 .wrap_err(BridgeError::IntConversionError)?;
142 Ok(Some(block))
143 }
144 None => Ok(None),
145 }
146 }
147
148 pub async fn get_full_block_from_hash(
150 &self,
151 tx: Option<DatabaseTransaction<'_>>,
152 block_hash: BlockHash,
153 ) -> Result<Option<(u32, bitcoin::Block)>, BridgeError> {
154 let query =
155 sqlx::query_as("SELECT height, block_data FROM bitcoin_blocks WHERE block_hash = $1")
156 .bind(BlockHashDB(block_hash));
157
158 let block_data: Option<(i32, Vec<u8>)> =
159 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
160
161 match block_data {
162 Some((height_i32, bytes)) => {
163 let height = u32::try_from(height_i32).wrap_err(BridgeError::IntConversionError)?;
164 let block = bitcoin::consensus::deserialize(&bytes)
165 .wrap_err(BridgeError::IntConversionError)?;
166 Ok(Some((height, block)))
167 }
168 None => Ok(None),
169 }
170 }
171
172 pub async fn get_max_height(
174 &self,
175 tx: Option<DatabaseTransaction<'_>>,
176 ) -> Result<Option<u32>, BridgeError> {
177 let query =
178 sqlx::query_as("SELECT height FROM bitcoin_syncer WHERE is_canonical = true ORDER BY height DESC LIMIT 1");
179 let result: Option<(i32,)> =
180 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
181
182 result
183 .map(|(height,)| u32::try_from(height).wrap_err(BridgeError::IntConversionError))
184 .transpose()
185 .map_err(Into::into)
186 }
187
188 pub async fn update_non_canonical_block_hashes(
201 &self,
202 tx: Option<DatabaseTransaction<'_>>,
203 height: u32,
204 ) -> Result<Vec<u32>, BridgeError> {
205 let query = sqlx::query_as(
206 "WITH deleted AS (
207 UPDATE bitcoin_syncer
208 SET is_canonical = false
209 WHERE height > $1
210 RETURNING id
211 ) SELECT id FROM deleted ORDER BY id DESC",
212 )
213 .bind(i32::try_from(height).wrap_err(BridgeError::IntConversionError)?);
214
215 let block_ids: Vec<(i32,)> = execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
216 block_ids
217 .into_iter()
218 .map(|(block_id,)| u32::try_from(block_id).wrap_err(BridgeError::IntConversionError))
219 .collect::<Result<Vec<_>, eyre::Report>>()
220 .map_err(Into::into)
221 }
222
223 pub async fn get_canonical_block_id_from_height(
225 &self,
226 tx: Option<DatabaseTransaction<'_>>,
227 height: u32,
228 ) -> Result<Option<u32>, BridgeError> {
229 let query = sqlx::query_as(
230 "SELECT id FROM bitcoin_syncer WHERE height = $1 AND is_canonical = true",
231 )
232 .bind(i32::try_from(height).wrap_err(BridgeError::IntConversionError)?);
233
234 let block_id: Option<(i32,)> =
235 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
236
237 block_id
238 .map(|(block_id,)| u32::try_from(block_id).wrap_err(BridgeError::IntConversionError))
239 .transpose()
240 .map_err(Into::into)
241 }
242
243 pub async fn insert_txid_to_block(
245 &self,
246 tx: DatabaseTransaction<'_>,
247 block_id: u32,
248 txid: &bitcoin::Txid,
249 ) -> Result<(), BridgeError> {
250 let query = sqlx::query("INSERT INTO bitcoin_syncer_txs (block_id, txid) VALUES ($1, $2)")
251 .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?)
252 .bind(super::wrapper::TxidDB(*txid));
253
254 execute_query_with_tx!(self.connection, Some(tx), query, execute)?;
255
256 Ok(())
257 }
258
259 #[cfg(test)]
261 pub async fn get_block_txids(
262 &self,
263 tx: Option<DatabaseTransaction<'_>>,
264 block_id: u32,
265 ) -> Result<Vec<Txid>, BridgeError> {
266 let query = sqlx::query_as("SELECT txid FROM bitcoin_syncer_txs WHERE block_id = $1")
267 .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?);
268
269 let txids: Vec<(TxidDB,)> = execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
270
271 Ok(txids.into_iter().map(|(txid,)| txid.0).collect())
272 }
273
274 pub async fn get_canonical_block_heights_for_txids(
277 &self,
278 tx: Option<DatabaseTransaction<'_>>,
279 txids: &[Txid],
280 ) -> Result<Vec<(Txid, u32)>, BridgeError> {
281 if txids.is_empty() {
282 return Ok(Vec::new());
283 }
284
285 let txid_params: Vec<TxidDB> = txids.iter().map(|t| TxidDB(*t)).collect();
287
288 let query = sqlx::query_as::<_, (TxidDB, i32)>(
290 "SELECT bst.txid, bs.height
291 FROM bitcoin_syncer_txs bst
292 INNER JOIN bitcoin_syncer bs ON bst.block_id = bs.id
293 WHERE bst.txid = ANY($1) AND bs.is_canonical = true",
294 )
295 .bind(&txid_params);
296
297 let results: Vec<(TxidDB, i32)> =
298 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
299
300 results
301 .into_iter()
302 .map(|(txid, height)| {
303 let height =
304 u32::try_from(height).wrap_err("Failed to convert block height to u32")?;
305 Ok((txid.0, height))
306 })
307 .collect()
308 }
309
310 pub async fn get_canonical_block_height_for_txid(
313 &self,
314 tx: Option<DatabaseTransaction<'_>>,
315 txid: Txid,
316 ) -> Result<Option<u32>, BridgeError> {
317 let query = sqlx::query_scalar::<_, i32>(
318 "SELECT bs.height
319 FROM bitcoin_syncer_txs bst
320 INNER JOIN bitcoin_syncer bs ON bst.block_id = bs.id
321 WHERE bst.txid = $1 AND bs.is_canonical = true",
322 )
323 .bind(TxidDB(txid));
324
325 let result: Option<i32> =
326 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
327
328 result
329 .map(|height| u32::try_from(height).wrap_err("Failed to convert block height to u32"))
330 .transpose()
331 .map_err(Into::into)
332 }
333
334 pub async fn insert_spent_utxo(
336 &self,
337 tx: DatabaseTransaction<'_>,
338 block_id: u32,
339 spending_txid: &bitcoin::Txid,
340 txid: &bitcoin::Txid,
341 vout: i64,
342 ) -> Result<(), BridgeError> {
343 sqlx::query(
344 "INSERT INTO bitcoin_syncer_spent_utxos (block_id, spending_txid, txid, vout) VALUES ($1, $2, $3, $4)",
345 )
346 .bind(block_id as i32)
347 .bind(super::wrapper::TxidDB(*spending_txid))
348 .bind(super::wrapper::TxidDB(*txid))
349 .bind(vout)
350 .execute(tx.deref_mut())
351 .await?;
352 Ok(())
353 }
354
355 pub async fn get_block_height_of_spending_txid(
358 &self,
359 tx: Option<DatabaseTransaction<'_>>,
360 outpoint: OutPoint,
361 ) -> Result<Option<u32>, BridgeError> {
362 let query = sqlx::query_scalar::<_, i32>(
363 "SELECT bs.height FROM bitcoin_syncer_spent_utxos bspu
364 INNER JOIN bitcoin_syncer bs ON bspu.block_id = bs.id
365 WHERE bspu.txid = $1 AND bspu.vout = $2 AND bs.is_canonical = true",
366 )
367 .bind(super::wrapper::TxidDB(outpoint.txid))
368 .bind(outpoint.vout as i64);
369
370 let result: Option<i32> =
371 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
372
373 result
374 .map(|height| u32::try_from(height).wrap_err(BridgeError::IntConversionError))
375 .transpose()
376 .map_err(Into::into)
377 }
378
379 pub async fn check_if_utxo_spending_tx_is_finalized(
382 &self,
383 tx: Option<DatabaseTransaction<'_>>,
384 outpoint: OutPoint,
385 current_chain_height: u32,
386 protocol_paramset: &'static ProtocolParamset,
387 ) -> Result<bool, BridgeError> {
388 let spending_tx_height = self.get_block_height_of_spending_txid(tx, outpoint).await?;
389 match spending_tx_height {
390 Some(spending_tx_height) => {
391 Ok(protocol_paramset.is_block_finalized(spending_tx_height, current_chain_height))
392 }
393 None => Ok(false),
394 }
395 }
396
397 pub async fn get_spent_utxos_for_txid(
399 &self,
400 tx: Option<DatabaseTransaction<'_>>,
401 txid: Txid,
402 ) -> Result<Vec<(i64, OutPoint)>, BridgeError> {
403 let query = sqlx::query_as(
404 "SELECT block_id, txid, vout FROM bitcoin_syncer_spent_utxos WHERE spending_txid = $1",
405 )
406 .bind(TxidDB(txid));
407
408 let spent_utxos: Vec<(i64, TxidDB, i64)> =
409 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
410
411 spent_utxos
412 .into_iter()
413 .map(
414 |(block_id, txid, vout)| -> Result<(i64, OutPoint), BridgeError> {
415 let vout = u32::try_from(vout).wrap_err(BridgeError::IntConversionError)?;
416 Ok((block_id, OutPoint { txid: txid.0, vout }))
417 },
418 )
419 .collect::<Result<Vec<_>, BridgeError>>()
420 }
421
422 pub async fn insert_event(
424 &self,
425 tx: Option<DatabaseTransaction<'_>>,
426 event_type: BitcoinSyncerEvent,
427 ) -> Result<(), BridgeError> {
428 let query = match event_type {
429 BitcoinSyncerEvent::NewBlock(block_id) => sqlx::query(
430 "INSERT INTO bitcoin_syncer_events (block_id, event_type) VALUES ($1, 'new_block'::bitcoin_syncer_event_type)",
431 )
432 .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?),
433 BitcoinSyncerEvent::ReorgedBlock(block_id) => sqlx::query(
434 "INSERT INTO bitcoin_syncer_events (block_id, event_type) VALUES ($1, 'reorged_block'::bitcoin_syncer_event_type)",
435 )
436 .bind(i32::try_from(block_id).wrap_err(BridgeError::IntConversionError)?),
437 };
438 execute_query_with_tx!(self.connection, tx, query, execute)?;
439 Ok(())
440 }
441
442 pub async fn get_last_processed_event_block_height(
445 &self,
446 tx: Option<DatabaseTransaction<'_>>,
447 consumer_handle: &str,
448 ) -> Result<Option<u32>, BridgeError> {
449 let query = sqlx::query_scalar::<_, i32>(
450 r#"SELECT bs.height
451 FROM bitcoin_syncer_event_handlers bseh
452 INNER JOIN bitcoin_syncer_events bse ON bseh.last_processed_event_id = bse.id
453 INNER JOIN bitcoin_syncer bs ON bse.block_id = bs.id
454 WHERE bseh.consumer_handle = $1"#,
455 )
456 .bind(consumer_handle);
457
458 let result: Option<i32> =
459 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
460
461 result
462 .map(|h| {
463 u32::try_from(h)
464 .wrap_err(BridgeError::IntConversionError)
465 .map_err(BridgeError::from)
466 })
467 .transpose()
468 }
469
470 pub async fn get_last_processed_event_id(
472 &self,
473 tx: DatabaseTransaction<'_>,
474 consumer_handle: &str,
475 ) -> Result<i32, BridgeError> {
476 sqlx::query(
478 r#"
479 INSERT INTO bitcoin_syncer_event_handlers (consumer_handle, last_processed_event_id)
480 VALUES ($1, 0)
481 ON CONFLICT (consumer_handle) DO NOTHING
482 "#,
483 )
484 .bind(consumer_handle)
485 .execute(tx.deref_mut())
486 .await?;
487
488 let last_processed_event_id: i32 = sqlx::query_scalar(
490 r#"
491 SELECT last_processed_event_id
492 FROM bitcoin_syncer_event_handlers
493 WHERE consumer_handle = $1
494 "#,
495 )
496 .bind(consumer_handle)
497 .fetch_one(tx.deref_mut())
498 .await?;
499
500 Ok(last_processed_event_id)
501 }
502
503 pub async fn get_max_processed_block_height(
506 &self,
507 tx: Option<DatabaseTransaction<'_>>,
508 consumer_handle: &str,
509 ) -> Result<Option<u32>, BridgeError> {
510 let query = sqlx::query_scalar::<_, Option<i32>>(
511 r#"SELECT MAX(bs.height)
512 FROM bitcoin_syncer_events bse
513 INNER JOIN bitcoin_syncer bs ON bse.block_id = bs.id
514 WHERE bse.id <= (
515 SELECT last_processed_event_id
516 FROM bitcoin_syncer_event_handlers
517 WHERE consumer_handle = $1
518 )"#,
519 )
520 .bind(consumer_handle);
521
522 let result: Option<i32> = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
523
524 result
525 .map(|h| {
526 u32::try_from(h)
527 .wrap_err(BridgeError::IntConversionError)
528 .map_err(BridgeError::from)
529 })
530 .transpose()
531 }
532
533 pub async fn get_next_finalized_block_height_for_consumer(
537 &self,
538 tx: Option<DatabaseTransaction<'_>>,
539 consumer_handle: &str,
540 paramset: &'static ProtocolParamset,
541 ) -> Result<u32, BridgeError> {
542 let max_processed_block_height = self
543 .get_max_processed_block_height(tx, consumer_handle)
544 .await?;
545
546 let max_processed_finalized_block_height = match max_processed_block_height {
547 Some(max_processed_block_height) => {
548 max_processed_block_height.checked_sub(paramset.finality_depth - 1)
549 }
550 None => None,
551 };
552
553 let next_height = max_processed_finalized_block_height
554 .map(|h| h + 1)
555 .unwrap_or(paramset.start_height);
556
557 Ok(std::cmp::max(next_height, paramset.start_height))
558 }
559
560 pub async fn fetch_next_bitcoin_syncer_evt(
566 &self,
567 tx: DatabaseTransaction<'_>,
568 consumer_handle: &str,
569 ) -> Result<Option<BitcoinSyncerEvent>, BridgeError> {
570 let last_processed_event_id = self
572 .get_last_processed_event_id(tx, consumer_handle)
573 .await?;
574
575 let event = sqlx::query_as::<_, (i32, i32, String)>(
577 r#"
578 SELECT id, block_id, event_type::text
579 FROM bitcoin_syncer_events
580 WHERE id > $1
581 ORDER BY id ASC
582 LIMIT 1
583 "#,
584 )
585 .bind(last_processed_event_id)
586 .fetch_optional(tx.deref_mut())
587 .await?;
588
589 if event.is_none() {
590 return Ok(None);
591 }
592
593 let event = event.expect("should exist since we checked is_none()");
594 let event_id = event.0;
595 let event_type: BitcoinSyncerEvent = (event.2, event.1).try_into()?;
596
597 sqlx::query(
599 r#"
600 UPDATE bitcoin_syncer_event_handlers
601 SET last_processed_event_id = $1
602 WHERE consumer_handle = $2
603 "#,
604 )
605 .bind(event_id)
606 .bind(consumer_handle)
607 .execute(tx.deref_mut())
608 .await?;
609
610 Ok(Some(event_type))
611 }
612}
613
614#[cfg(test)]
615mod tests {
616 use super::*;
617 use crate::database::Database;
618 use crate::test::common::*;
619 use bitcoin::hashes::Hash;
620 use bitcoin::{BlockHash, CompactTarget};
621
622 async fn setup_test_db() -> Database {
623 let config = create_test_config_with_thread_name().await;
624 Database::new(&config).await.unwrap()
625 }
626
627 #[tokio::test]
628 async fn test_event_handling() {
629 let db = setup_test_db().await;
630 let mut dbtx = db.begin_transaction().await.unwrap();
631
632 let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
634 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
635 let height = 0x45;
636
637 let block_id = db
638 .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
639 .await
640 .unwrap();
641
642 db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
644 .await
645 .unwrap();
646
647 let consumer_handle = "test_consumer";
649 let event = db
650 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer_handle)
651 .await
652 .unwrap();
653
654 assert!(matches!(event, Some(BitcoinSyncerEvent::NewBlock(id)) if id == block_id));
655
656 let event = db
658 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer_handle)
659 .await
660 .unwrap();
661 assert!(event.is_none());
662
663 db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::ReorgedBlock(block_id))
665 .await
666 .unwrap();
667
668 let event = db
670 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer_handle)
671 .await
672 .unwrap();
673 assert!(matches!(event, Some(BitcoinSyncerEvent::ReorgedBlock(id)) if id == block_id));
674
675 dbtx.commit().await.unwrap();
676 }
677
678 #[tokio::test]
679 async fn test_store_and_get_block() {
680 let db = setup_test_db().await;
681 let block_height = 123u32;
682
683 let dummy_header = bitcoin::block::Header {
685 version: bitcoin::block::Version::TWO,
686 prev_blockhash: BlockHash::from_raw_hash(Hash::from_byte_array([0x42; 32])),
687 merkle_root: bitcoin::TxMerkleNode::all_zeros(),
688 time: 1_000_000,
689 bits: CompactTarget::from_consensus(0),
690 nonce: 12345,
691 };
692
693 let dummy_txs = vec![bitcoin::Transaction {
694 version: bitcoin::blockdata::transaction::Version::TWO,
695 lock_time: bitcoin::absolute::LockTime::ZERO,
696 input: vec![],
697 output: vec![],
698 }];
699
700 let dummy_block = bitcoin::Block {
701 header: dummy_header,
702 txdata: dummy_txs.clone(),
703 };
704
705 let dummy_block_hash = dummy_block.block_hash();
706
707 db.upsert_full_block(None, &dummy_block, block_height)
709 .await
710 .unwrap();
711
712 let retrieved_block = db
714 .get_full_block(None, block_height)
715 .await
716 .unwrap()
717 .unwrap();
718
719 assert_eq!(retrieved_block, dummy_block);
721
722 let retrieved_block_from_hash = db
724 .get_full_block_from_hash(None, dummy_block_hash)
725 .await
726 .unwrap()
727 .unwrap()
728 .1;
729
730 assert_eq!(retrieved_block_from_hash, dummy_block);
732
733 assert!(db.get_full_block(None, 999).await.unwrap().is_none());
735
736 let updated_dummy_header = bitcoin::block::Header {
738 version: bitcoin::block::Version::ONE, ..dummy_header
740 };
741 let updated_dummy_block = bitcoin::Block {
742 header: updated_dummy_header,
743 txdata: dummy_txs.clone(),
744 };
745
746 let updated_dummy_block_hash = updated_dummy_block.block_hash();
747
748 db.upsert_full_block(None, &updated_dummy_block, block_height)
749 .await
750 .unwrap();
751
752 let retrieved_updated_block = db
754 .get_full_block(None, block_height)
755 .await
756 .unwrap()
757 .unwrap();
758 assert_eq!(updated_dummy_block, retrieved_updated_block);
759
760 let retrieved_updated_block_from_hash = db
761 .get_full_block_from_hash(None, updated_dummy_block_hash)
762 .await
763 .unwrap()
764 .unwrap()
765 .1;
766 assert_eq!(updated_dummy_block, retrieved_updated_block_from_hash);
767 }
768
769 #[tokio::test]
770 async fn test_multiple_event_consumers() {
771 let db = setup_test_db().await;
772 let mut dbtx = db.begin_transaction().await.unwrap();
773
774 let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
776 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
777 let height = 0x45;
778
779 let block_id = db
780 .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
781 .await
782 .unwrap();
783
784 db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::NewBlock(block_id))
786 .await
787 .unwrap();
788 db.insert_event(Some(&mut dbtx), BitcoinSyncerEvent::ReorgedBlock(block_id))
789 .await
790 .unwrap();
791
792 let consumer1 = "consumer1";
794 let consumer2 = "consumer2";
795
796 let event1 = db
798 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer1)
799 .await
800 .unwrap();
801 assert!(matches!(event1, Some(BitcoinSyncerEvent::NewBlock(id)) if id == block_id));
802
803 let event2 = db
804 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer1)
805 .await
806 .unwrap();
807 assert!(matches!(event2, Some(BitcoinSyncerEvent::ReorgedBlock(id)) if id == block_id));
808
809 let event1 = db
811 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer2)
812 .await
813 .unwrap();
814 assert!(matches!(event1, Some(BitcoinSyncerEvent::NewBlock(id)) if id == block_id));
815
816 let event2 = db
817 .fetch_next_bitcoin_syncer_evt(&mut dbtx, consumer2)
818 .await
819 .unwrap();
820 assert!(matches!(event2, Some(BitcoinSyncerEvent::ReorgedBlock(id)) if id == block_id));
821
822 dbtx.commit().await.unwrap();
823 }
824
825 #[tokio::test]
826 async fn test_non_canonical_blocks() {
827 let db = setup_test_db().await;
828 let mut dbtx = db.begin_transaction().await.unwrap();
829
830 let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
832 let heights = [1, 2, 3, 4, 5];
833 let mut last_hash = prev_block_hash;
834
835 let mut block_ids = Vec::new();
837 for height in heights {
838 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([height as u8; 32]));
839 let block_id = db
840 .insert_block_info(Some(&mut dbtx), &block_hash, &last_hash, height)
841 .await
842 .unwrap();
843 block_ids.push(block_id);
844 last_hash = block_hash;
845 }
846
847 let non_canonical_blocks = db
849 .update_non_canonical_block_hashes(Some(&mut dbtx), 2)
850 .await
851 .unwrap();
852 assert_eq!(non_canonical_blocks.len(), 3);
853 assert_eq!(non_canonical_blocks, vec![5, 4, 3]);
854
855 for height in heights {
857 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([height as u8; 32]));
858 let block_info = db
859 .get_block_info_from_hash(Some(&mut dbtx), block_hash)
860 .await
861 .unwrap();
862
863 if height <= 2 {
864 assert!(block_info.is_some());
865 } else {
866 assert!(block_info.is_none());
867 }
868 }
869
870 let max_height = db.get_max_height(Some(&mut dbtx)).await.unwrap().unwrap();
872 assert_eq!(max_height, 2);
873
874 dbtx.commit().await.unwrap();
875 }
876
877 #[tokio::test]
878 async fn add_get_block_info() {
879 let config = create_test_config_with_thread_name().await;
880 let db = Database::new(&config).await.unwrap();
881
882 let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
883 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
884 let height = 0x45;
885
886 assert!(db
887 .get_block_info_from_hash(None, block_hash)
888 .await
889 .unwrap()
890 .is_none());
891
892 db.insert_block_info(None, &block_hash, &prev_block_hash, height)
893 .await
894 .unwrap();
895 let block_info = db
896 .get_block_info_from_hash(None, block_hash)
897 .await
898 .unwrap()
899 .unwrap();
900 let max_height = db.get_max_height(None).await.unwrap().unwrap();
901 assert_eq!(block_info.0, prev_block_hash);
902 assert_eq!(block_info.1, height);
903 assert_eq!(max_height, height);
904
905 db.insert_block_info(
906 None,
907 &BlockHash::from_raw_hash(Hash::from_byte_array([0x1; 32])),
908 &prev_block_hash,
909 height - 1,
910 )
911 .await
912 .unwrap();
913 let max_height = db.get_max_height(None).await.unwrap().unwrap();
914 assert_eq!(max_height, height);
915
916 db.insert_block_info(
917 None,
918 &BlockHash::from_raw_hash(Hash::from_byte_array([0x2; 32])),
919 &prev_block_hash,
920 height + 1,
921 )
922 .await
923 .unwrap();
924 let max_height = db.get_max_height(None).await.unwrap().unwrap();
925 assert_ne!(max_height, height);
926 assert_eq!(max_height, height + 1);
927 }
928
929 #[tokio::test]
930 async fn add_and_get_txids_from_block() {
931 let config = create_test_config_with_thread_name().await;
932 let db = Database::new(&config).await.unwrap();
933 let mut dbtx = db.begin_transaction().await.unwrap();
934
935 assert!(db
936 .insert_txid_to_block(&mut dbtx, 0, &Txid::all_zeros())
937 .await
938 .is_err());
939 let mut dbtx = db.begin_transaction().await.unwrap();
940
941 let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
942 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
943 let height = 0x45;
944 let block_id = db
945 .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
946 .await
947 .unwrap();
948
949 let txids = vec![
950 Txid::from_raw_hash(Hash::from_byte_array([0x1; 32])),
951 Txid::from_raw_hash(Hash::from_byte_array([0x2; 32])),
952 Txid::from_raw_hash(Hash::from_byte_array([0x3; 32])),
953 ];
954 for txid in &txids {
955 db.insert_txid_to_block(&mut dbtx, block_id, txid)
956 .await
957 .unwrap();
958 }
959
960 let txids_from_db = db.get_block_txids(Some(&mut dbtx), block_id).await.unwrap();
961 assert_eq!(txids_from_db, txids);
962
963 assert!(db
964 .get_block_txids(Some(&mut dbtx), block_id + 1)
965 .await
966 .unwrap()
967 .is_empty());
968
969 dbtx.commit().await.unwrap();
970 }
971
972 #[tokio::test]
973 async fn insert_get_spent_utxos() {
974 let config = create_test_config_with_thread_name().await;
975 let db = Database::new(&config).await.unwrap();
976 let mut dbtx = db.begin_transaction().await.unwrap();
977
978 let prev_block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x1F; 32]));
979 let block_hash = BlockHash::from_raw_hash(Hash::from_byte_array([0x45; 32]));
980 let height = 0x45;
981 let block_id = db
982 .insert_block_info(Some(&mut dbtx), &block_hash, &prev_block_hash, height)
983 .await
984 .unwrap();
985
986 let spending_txid = Txid::from_raw_hash(Hash::from_byte_array([0x2; 32]));
987 let txid = Txid::from_raw_hash(Hash::from_byte_array([0x1; 32]));
988 let vout = 0;
989 db.insert_txid_to_block(&mut dbtx, block_id, &spending_txid)
990 .await
991 .unwrap();
992
993 assert_eq!(
994 db.get_spent_utxos_for_txid(Some(&mut dbtx), txid)
995 .await
996 .unwrap()
997 .len(),
998 0
999 );
1000
1001 db.insert_spent_utxo(&mut dbtx, block_id, &spending_txid, &txid, vout)
1002 .await
1003 .unwrap();
1004
1005 let spent_utxos = db
1006 .get_spent_utxos_for_txid(Some(&mut dbtx), spending_txid)
1007 .await
1008 .unwrap();
1009 assert_eq!(spent_utxos.len(), 1);
1010 assert_eq!(spent_utxos[0].0, block_id as i64);
1011 assert_eq!(
1012 spent_utxos[0].1,
1013 bitcoin::OutPoint {
1014 txid,
1015 vout: vout as u32,
1016 }
1017 );
1018
1019 dbtx.commit().await.unwrap();
1020 }
1021}