clementine_core/database/
state_machine.rs1use bitcoin::XOnlyPublicKey;
6
7use super::{wrapper::XOnlyPublicKeyDB, Database, DatabaseTransaction};
8use crate::errors::BridgeError;
9use crate::execute_query_with_tx;
10
11impl Database {
12 pub async fn save_state_machines(
25 &self,
26 tx: DatabaseTransaction<'_, '_>,
27 kickoff_machines: Vec<(String, String)>,
28 round_machines: Vec<(String, XOnlyPublicKey)>,
29 block_height: i32,
30 owner_type: &str,
31 ) -> Result<(), BridgeError> {
32 for (state_json, kickoff_id) in kickoff_machines {
34 let query = sqlx::query(
35 "INSERT INTO state_machines (
36 machine_type,
37 state_json,
38 kickoff_id,
39 owner_type,
40 block_height,
41 created_at,
42 updated_at
43 ) VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
44 ON CONFLICT (machine_type, kickoff_id, owner_type)
45 DO UPDATE SET
46 state_json = EXCLUDED.state_json,
47 block_height = EXCLUDED.block_height,
48 updated_at = NOW()",
49 )
50 .bind("kickoff")
51 .bind(&state_json)
52 .bind(kickoff_id)
53 .bind(owner_type)
54 .bind(block_height);
55
56 query.execute(&mut **tx).await?;
57 }
58
59 for (state_json, operator_xonly_pk) in round_machines {
61 let query = sqlx::query(
62 "INSERT INTO state_machines (
63 machine_type,
64 state_json,
65 operator_xonly_pk,
66 owner_type,
67 block_height,
68 created_at,
69 updated_at
70 ) VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
71 ON CONFLICT (machine_type, operator_xonly_pk, owner_type)
72 DO UPDATE SET
73 state_json = EXCLUDED.state_json,
74 block_height = EXCLUDED.block_height,
75 updated_at = NOW()",
76 )
77 .bind("round")
78 .bind(&state_json)
79 .bind(XOnlyPublicKeyDB(operator_xonly_pk))
80 .bind(owner_type)
81 .bind(block_height);
82
83 query.execute(&mut **tx).await?;
84 }
85
86 let query = sqlx::query(
88 "INSERT INTO state_manager_status (
89 owner_type,
90 next_height_to_process,
91 updated_at
92 ) VALUES ($1, $2, NOW())
93 ON CONFLICT (owner_type)
94 DO UPDATE SET
95 next_height_to_process = EXCLUDED.next_height_to_process,
96 updated_at = NOW()",
97 )
98 .bind(owner_type)
99 .bind(block_height);
100
101 query.execute(&mut **tx).await?;
102
103 Ok(())
104 }
105
106 pub async fn get_next_height_to_process(
116 &self,
117 tx: Option<DatabaseTransaction<'_, '_>>,
118 owner_type: &str,
119 ) -> Result<Option<i32>, BridgeError> {
120 let query = sqlx::query_as(
121 "SELECT next_height_to_process FROM state_manager_status WHERE owner_type = $1",
122 )
123 .bind(owner_type);
124
125 let result: Option<(i32,)> =
126 execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
127
128 Ok(result.map(|(height,)| height))
129 }
130
131 pub async fn load_kickoff_machines(
142 &self,
143 tx: Option<DatabaseTransaction<'_, '_>>,
144 owner_type: &str,
145 ) -> Result<Vec<(String, String, i32)>, BridgeError> {
146 let query = sqlx::query_as(
147 "SELECT
148 state_json,
149 kickoff_id,
150 block_height
151 FROM state_machines
152 WHERE machine_type = 'kickoff' AND owner_type = $1",
153 )
154 .bind(owner_type);
155
156 let results = execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
157
158 Ok(results)
159 }
160
161 pub async fn load_round_machines(
172 &self,
173 tx: Option<DatabaseTransaction<'_, '_>>,
174 owner_type: &str,
175 ) -> Result<Vec<(String, XOnlyPublicKey, i32)>, BridgeError> {
176 let query = sqlx::query_as(
177 "SELECT
178 state_json,
179 operator_xonly_pk,
180 block_height
181 FROM state_machines
182 WHERE machine_type = 'round' AND owner_type = $1",
183 )
184 .bind(owner_type);
185
186 let results: Vec<(String, XOnlyPublicKeyDB, i32)> =
187 execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
188
189 Ok(results
190 .into_iter()
191 .map(|(state_json, operator_xonly_pk, block_height)| {
192 (state_json, operator_xonly_pk.0, block_height)
193 })
194 .collect())
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use crate::test::common::*;
202
203 #[tokio::test]
204 async fn test_save_and_load_state_machines() {
205 let config = create_test_config_with_thread_name().await;
206 let db = Database::new(&config).await.unwrap();
207
208 let xonly_pk1 = generate_random_xonly_pk();
209 let xonly_pk2 = generate_random_xonly_pk();
210
211 let owner_type = "test_owner";
213 let kickoff_machines = vec![
214 ("kickoff_state_1".to_string(), "kickoff_id_1".to_string()),
215 ("kickoff_state_2".to_string(), "kickoff_id_2".to_string()),
216 ];
217
218 let round_machines = vec![
219 ("round_state_1".to_string(), xonly_pk1),
220 ("round_state_2".to_string(), xonly_pk2),
221 ];
222
223 let mut dbtx = db.begin_transaction().await.unwrap();
224 db.save_state_machines(
226 &mut dbtx,
227 kickoff_machines.clone(),
228 round_machines.clone(),
229 123,
230 owner_type,
231 )
232 .await
233 .unwrap();
234 dbtx.commit().await.unwrap();
235
236 let block_height = db
238 .get_next_height_to_process(None, owner_type)
239 .await
240 .unwrap();
241 assert_eq!(block_height, Some(123));
242
243 let loaded_kickoff = db.load_kickoff_machines(None, owner_type).await.unwrap();
245 assert_eq!(loaded_kickoff.len(), 2);
246 assert_eq!(loaded_kickoff[0].0, "kickoff_state_1");
247 assert_eq!(loaded_kickoff[0].1, "kickoff_id_1");
248 assert_eq!(loaded_kickoff[0].2, 123);
249
250 let loaded_round = db.load_round_machines(None, owner_type).await.unwrap();
252 assert_eq!(loaded_round.len(), 2);
253 assert_eq!(loaded_round[0].0, "round_state_1");
254 assert_eq!(loaded_round[0].1, xonly_pk1);
255 assert_eq!(loaded_round[0].2, 123);
256 }
257}