clementine_core/database/
state_machine.rs1use bitcoin::XOnlyPublicKey;
6
7use super::{wrapper::XOnlyPublicKeyDB, Database, DatabaseTransaction};
8use crate::execute_query_with_tx;
9use clementine_errors::BridgeError;
10
11impl Database {
12 pub async fn save_state_machines(
26 &self,
27 tx: DatabaseTransaction<'_>,
28 kickoff_machines: Vec<(String, String)>,
29 round_machines: Vec<(String, XOnlyPublicKey)>,
30 block_height: i32,
31 owner_type: &str,
32 last_processed_lcp: Option<i32>,
33 ) -> Result<(), BridgeError> {
34 for (state_json, kickoff_id) in kickoff_machines {
36 let query = sqlx::query(
37 "INSERT INTO state_machines (
38 machine_type,
39 state_json,
40 kickoff_id,
41 owner_type,
42 block_height,
43 created_at,
44 updated_at
45 ) VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
46 ON CONFLICT (machine_type, kickoff_id, owner_type)
47 DO UPDATE SET
48 state_json = EXCLUDED.state_json,
49 block_height = EXCLUDED.block_height,
50 updated_at = NOW()",
51 )
52 .bind("kickoff")
53 .bind(&state_json)
54 .bind(kickoff_id)
55 .bind(owner_type)
56 .bind(block_height);
57
58 query.execute(&mut **tx).await?;
59 }
60
61 for (state_json, operator_xonly_pk) in round_machines {
63 let query = sqlx::query(
64 "INSERT INTO state_machines (
65 machine_type,
66 state_json,
67 operator_xonly_pk,
68 owner_type,
69 block_height,
70 created_at,
71 updated_at
72 ) VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
73 ON CONFLICT (machine_type, operator_xonly_pk, owner_type)
74 DO UPDATE SET
75 state_json = EXCLUDED.state_json,
76 block_height = EXCLUDED.block_height,
77 updated_at = NOW()",
78 )
79 .bind("round")
80 .bind(&state_json)
81 .bind(XOnlyPublicKeyDB(operator_xonly_pk))
82 .bind(owner_type)
83 .bind(block_height);
84
85 query.execute(&mut **tx).await?;
86 }
87
88 let query = sqlx::query(
90 "INSERT INTO state_manager_status (
91 owner_type,
92 next_height_to_process,
93 last_processed_lcp,
94 updated_at
95 ) VALUES ($1, $2, $3, NOW())
96 ON CONFLICT (owner_type)
97 DO UPDATE SET
98 next_height_to_process = EXCLUDED.next_height_to_process,
99 last_processed_lcp = EXCLUDED.last_processed_lcp,
100 updated_at = NOW()",
101 )
102 .bind(owner_type)
103 .bind(block_height)
104 .bind(last_processed_lcp);
105
106 query.execute(&mut **tx).await?;
107
108 Ok(())
109 }
110
111 pub async fn get_next_height_to_process(
121 &self,
122 tx: Option<DatabaseTransaction<'_>>,
123 owner_type: &str,
124 ) -> Result<Option<i32>, BridgeError> {
125 let query = sqlx::query_as(
126 "SELECT next_height_to_process FROM state_manager_status WHERE owner_type = $1",
127 )
128 .bind(owner_type);
129
130 let result: Option<(i32,)> =
131 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
132
133 Ok(result.map(|(height,)| height))
134 }
135
136 pub async fn get_last_processed_lcp(
147 &self,
148 tx: Option<DatabaseTransaction<'_>>,
149 owner_type: &str,
150 ) -> Result<Option<i32>, BridgeError> {
151 let query = sqlx::query_as(
152 "SELECT last_processed_lcp FROM state_manager_status WHERE owner_type = $1",
153 )
154 .bind(owner_type);
155
156 let result: Option<(Option<i32>,)> =
157 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
158
159 Ok(result.and_then(|(lcp,)| lcp))
160 }
161
162 pub async fn load_kickoff_machines(
173 &self,
174 tx: Option<DatabaseTransaction<'_>>,
175 owner_type: &str,
176 ) -> Result<Vec<(String, String, i32)>, BridgeError> {
177 let query = sqlx::query_as(
178 "SELECT
179 state_json,
180 kickoff_id,
181 block_height
182 FROM state_machines
183 WHERE machine_type = 'kickoff' AND owner_type = $1",
184 )
185 .bind(owner_type);
186
187 let results = execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
188
189 Ok(results)
190 }
191
192 pub async fn load_round_machines(
203 &self,
204 tx: Option<DatabaseTransaction<'_>>,
205 owner_type: &str,
206 ) -> Result<Vec<(String, XOnlyPublicKey, i32)>, BridgeError> {
207 let query = sqlx::query_as(
208 "SELECT
209 state_json,
210 operator_xonly_pk,
211 block_height
212 FROM state_machines
213 WHERE machine_type = 'round' AND owner_type = $1",
214 )
215 .bind(owner_type);
216
217 let results: Vec<(String, XOnlyPublicKeyDB, i32)> =
218 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
219
220 Ok(results
221 .into_iter()
222 .map(|(state_json, operator_xonly_pk, block_height)| {
223 (state_json, operator_xonly_pk.0, block_height)
224 })
225 .collect())
226 }
227
228 pub async fn pgmq_queue_exists(
239 &self,
240 queue_name: &str,
241 tx: Option<DatabaseTransaction<'_>>,
242 ) -> Result<bool, BridgeError> {
243 let query = sqlx::query_as::<_, (bool,)>(
244 "SELECT EXISTS(SELECT 1 FROM pgmq.meta WHERE queue_name = $1)",
245 )
246 .bind(queue_name);
247
248 let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
249
250 Ok(result.0)
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use crate::test::common::*;
258
259 #[tokio::test]
260 async fn test_save_and_load_state_machines() {
261 let config = create_test_config_with_thread_name().await;
262 let db = Database::new(&config).await.unwrap();
263
264 let xonly_pk1 = generate_random_xonly_pk();
265 let xonly_pk2 = generate_random_xonly_pk();
266
267 let owner_type = "test_owner";
269 let kickoff_machines = vec![
270 ("kickoff_state_1".to_string(), "kickoff_id_1".to_string()),
271 ("kickoff_state_2".to_string(), "kickoff_id_2".to_string()),
272 ];
273
274 let round_machines = vec![
275 ("round_state_1".to_string(), xonly_pk1),
276 ("round_state_2".to_string(), xonly_pk2),
277 ];
278
279 let mut dbtx = db.begin_transaction().await.unwrap();
280 db.save_state_machines(
282 &mut dbtx,
283 kickoff_machines.clone(),
284 round_machines.clone(),
285 123,
286 owner_type,
287 Some(1234),
288 )
289 .await
290 .unwrap();
291 dbtx.commit().await.unwrap();
292
293 let block_height = db
295 .get_next_height_to_process(None, owner_type)
296 .await
297 .unwrap();
298 assert_eq!(block_height, Some(123));
299
300 let last_processed_lcp = db.get_last_processed_lcp(None, owner_type).await.unwrap();
301 assert_eq!(last_processed_lcp, Some(1234));
302
303 let loaded_kickoff = db.load_kickoff_machines(None, owner_type).await.unwrap();
305 assert_eq!(loaded_kickoff.len(), 2);
306 assert_eq!(loaded_kickoff[0].0, "kickoff_state_1");
307 assert_eq!(loaded_kickoff[0].1, "kickoff_id_1");
308 assert_eq!(loaded_kickoff[0].2, 123);
309
310 let loaded_round = db.load_round_machines(None, owner_type).await.unwrap();
312 assert_eq!(loaded_round.len(), 2);
313 assert_eq!(loaded_round[0].0, "round_state_1");
314 assert_eq!(loaded_round[0].1, xonly_pk1);
315 assert_eq!(loaded_round[0].2, 123);
316 }
317}