clementine_core/database/
aggregator.rs1use super::{wrapper::TxidDB, Database, DatabaseTransaction};
6use crate::{errors::BridgeError, execute_query_with_tx};
7use bitcoin::Txid;
8use eyre;
9use sqlx::QueryBuilder;
10
11impl Database {
12 pub async fn insert_signed_emergency_stop_tx_if_not_exists(
14 &self,
15 tx: Option<DatabaseTransaction<'_, '_>>,
16 move_txid: &Txid,
17 encrypted_emergency_stop_tx: &[u8],
18 ) -> Result<(), BridgeError> {
19 let query = sqlx::query(
20 "INSERT INTO emergency_stop_sigs (move_txid, emergency_stop_tx) VALUES ($1, $2)
21 ON CONFLICT (move_txid) DO NOTHING;",
22 )
23 .bind(TxidDB(*move_txid))
24 .bind(encrypted_emergency_stop_tx);
25
26 execute_query_with_tx!(self.connection, tx, query, execute)?;
27
28 Ok(())
29 }
30
31 pub async fn get_emergency_stop_txs(
33 &self,
34 tx: Option<DatabaseTransaction<'_, '_>>,
35 move_txids: Vec<Txid>,
36 ) -> Result<Vec<(Txid, Vec<u8>)>, BridgeError> {
37 if move_txids.is_empty() {
38 return Ok(Vec::new());
39 }
40
41 let mut query_builder = QueryBuilder::new(
42 "SELECT move_txid, emergency_stop_tx FROM emergency_stop_sigs WHERE move_txid IN (",
43 );
44
45 let mut separated = query_builder.separated(", ");
46 for txid in &move_txids {
47 separated.push_bind(TxidDB(*txid));
48 }
49 query_builder.push(")");
50
51 let query = query_builder.build_query_as::<(TxidDB, Vec<u8>)>();
52
53 let results: Vec<(TxidDB, Vec<u8>)> =
54 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
55
56 Ok(results
57 .into_iter()
58 .map(|(txid, tx_data)| Ok((txid.0, tx_data)))
59 .collect::<Result<_, eyre::Report>>()?)
60 }
61}
62
63#[cfg(test)]
64mod tests {
65 use super::*;
66 use crate::{
67 builder::transaction::{TransactionType, TxHandlerBuilder},
68 test::common::*,
69 };
70 use bitcoin::{
71 consensus::{self},
72 hashes::Hash,
73 Transaction, Txid,
74 };
75 fn create_test_transaction() -> Transaction {
76 let tx_handler = TxHandlerBuilder::new(TransactionType::Dummy).finalize();
77 tx_handler.get_cached_tx().clone()
78 }
79
80 #[tokio::test]
81 async fn test_set_get_emergency_stop_tx() {
82 let config = create_test_config_with_thread_name().await;
83 let database = Database::new(&config).await.unwrap();
84
85 let move_txid = Txid::from_byte_array([1u8; 32]);
86 let emergency_stop_tx = create_test_transaction();
87 database
88 .insert_signed_emergency_stop_tx_if_not_exists(
89 None,
90 &move_txid,
91 &consensus::serialize(&emergency_stop_tx),
92 )
93 .await
94 .unwrap();
95
96 let results = database
97 .get_emergency_stop_txs(None, vec![move_txid])
98 .await
99 .unwrap();
100
101 assert_eq!(results.len(), 1);
102 assert_eq!(results[0].0, move_txid);
103 assert_eq!(results[0].1, consensus::serialize(&emergency_stop_tx));
104
105 let non_existent_txid = Txid::from_byte_array([2u8; 32]);
107 let results = database
108 .get_emergency_stop_txs(None, vec![non_existent_txid])
109 .await
110 .unwrap();
111 assert!(results.is_empty());
112
113 let move_txid2 = Txid::from_byte_array([3u8; 32]);
115 let emergency_stop_tx2 = create_test_transaction();
116 database
117 .insert_signed_emergency_stop_tx_if_not_exists(
118 None,
119 &move_txid2,
120 &consensus::serialize(&emergency_stop_tx2),
121 )
122 .await
123 .unwrap();
124
125 let results = database
126 .get_emergency_stop_txs(None, vec![move_txid, move_txid2])
127 .await
128 .unwrap();
129
130 assert_eq!(results.len(), 2);
131 let mut results = results;
132 results.sort_by(|a, b| a.0.cmp(&b.0));
133 assert_eq!(results[0].0, move_txid);
134 assert_eq!(results[0].1, consensus::serialize(&emergency_stop_tx));
135 assert_eq!(results[1].0, move_txid2);
136 assert_eq!(results[1].1, consensus::serialize(&emergency_stop_tx2));
137
138 let updated_tx = create_test_transaction();
140 database
141 .insert_signed_emergency_stop_tx_if_not_exists(
142 None,
143 &move_txid,
144 &consensus::serialize(&updated_tx),
145 )
146 .await
147 .unwrap();
148
149 let results = database
150 .get_emergency_stop_txs(None, vec![move_txid])
151 .await
152 .unwrap();
153
154 assert_eq!(results.len(), 1);
155 assert_eq!(results[0].0, move_txid);
156 assert_eq!(results[0].1, consensus::serialize(&updated_tx));
157 }
158}