clementine_core/database/
aggregator.rs1use super::{wrapper::TxidDB, Database, DatabaseTransaction};
6use crate::execute_query_with_tx;
7use bitcoin::Txid;
8use clementine_errors::BridgeError;
9use eyre;
10use sqlx::QueryBuilder;
11
12impl Database {
13 pub async fn insert_signed_emergency_stop_tx_if_not_exists(
15 &self,
16 tx: Option<DatabaseTransaction<'_>>,
17 move_txid: &Txid,
18 encrypted_emergency_stop_tx: &[u8],
19 ) -> Result<(), BridgeError> {
20 let query = sqlx::query(
21 "INSERT INTO emergency_stop_sigs (move_txid, emergency_stop_tx) VALUES ($1, $2)
22 ON CONFLICT (move_txid) DO NOTHING;",
23 )
24 .bind(TxidDB(*move_txid))
25 .bind(encrypted_emergency_stop_tx);
26
27 execute_query_with_tx!(self.connection, tx, query, execute)?;
28
29 Ok(())
30 }
31
32 pub async fn get_emergency_stop_txs(
34 &self,
35 tx: Option<DatabaseTransaction<'_>>,
36 move_txids: Vec<Txid>,
37 ) -> Result<Vec<(Txid, Vec<u8>)>, BridgeError> {
38 if move_txids.is_empty() {
39 return Ok(Vec::new());
40 }
41
42 let mut query_builder = QueryBuilder::new(
43 "SELECT move_txid, emergency_stop_tx FROM emergency_stop_sigs WHERE move_txid IN (",
44 );
45
46 let mut separated = query_builder.separated(", ");
47 for txid in &move_txids {
48 separated.push_bind(TxidDB(*txid));
49 }
50 query_builder.push(")");
51
52 let query = query_builder.build_query_as::<(TxidDB, Vec<u8>)>();
53
54 let results: Vec<(TxidDB, Vec<u8>)> =
55 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
56
57 Ok(results
58 .into_iter()
59 .map(|(txid, tx_data)| Ok((txid.0, tx_data)))
60 .collect::<Result<_, eyre::Report>>()?)
61 }
62}
63
64#[cfg(test)]
65mod tests {
66 use super::*;
67 use crate::{builder::transaction::TxHandlerBuilder, test::common::*};
68 use bitcoin::{
69 consensus::{self},
70 hashes::Hash,
71 Transaction, Txid,
72 };
73 use clementine_primitives::TransactionType;
74 fn create_test_transaction() -> Transaction {
75 let tx_handler = TxHandlerBuilder::new(TransactionType::Dummy).finalize();
76 tx_handler.get_cached_tx().clone()
77 }
78
79 #[tokio::test]
80 async fn test_set_get_emergency_stop_tx() {
81 let config = create_test_config_with_thread_name().await;
82 let database = Database::new(&config).await.unwrap();
83
84 let move_txid = Txid::from_byte_array([1u8; 32]);
85 let emergency_stop_tx = create_test_transaction();
86 database
87 .insert_signed_emergency_stop_tx_if_not_exists(
88 None,
89 &move_txid,
90 &consensus::serialize(&emergency_stop_tx),
91 )
92 .await
93 .unwrap();
94
95 let results = database
96 .get_emergency_stop_txs(None, vec![move_txid])
97 .await
98 .unwrap();
99
100 assert_eq!(results.len(), 1);
101 assert_eq!(results[0].0, move_txid);
102 assert_eq!(results[0].1, consensus::serialize(&emergency_stop_tx));
103
104 let non_existent_txid = Txid::from_byte_array([2u8; 32]);
106 let results = database
107 .get_emergency_stop_txs(None, vec![non_existent_txid])
108 .await
109 .unwrap();
110 assert!(results.is_empty());
111
112 let move_txid2 = Txid::from_byte_array([3u8; 32]);
114 let emergency_stop_tx2 = create_test_transaction();
115 database
116 .insert_signed_emergency_stop_tx_if_not_exists(
117 None,
118 &move_txid2,
119 &consensus::serialize(&emergency_stop_tx2),
120 )
121 .await
122 .unwrap();
123
124 let results = database
125 .get_emergency_stop_txs(None, vec![move_txid, move_txid2])
126 .await
127 .unwrap();
128
129 assert_eq!(results.len(), 2);
130 let mut results = results;
131 results.sort_by(|a, b| a.0.cmp(&b.0));
132 assert_eq!(results[0].0, move_txid);
133 assert_eq!(results[0].1, consensus::serialize(&emergency_stop_tx));
134 assert_eq!(results[1].0, move_txid2);
135 assert_eq!(results[1].1, consensus::serialize(&emergency_stop_tx2));
136
137 let updated_tx = create_test_transaction();
139 database
140 .insert_signed_emergency_stop_tx_if_not_exists(
141 None,
142 &move_txid,
143 &consensus::serialize(&updated_tx),
144 )
145 .await
146 .unwrap();
147
148 let results = database
149 .get_emergency_stop_txs(None, vec![move_txid])
150 .await
151 .unwrap();
152
153 assert_eq!(results.len(), 1);
154 assert_eq!(results[0].0, move_txid);
155 assert_eq!(results[0].1, consensus::serialize(&updated_tx));
156 }
157}