clementine_core/database/
state_machine.rs

1//! # State Machine Related Database Operations
2//!
3//! This module includes database functions for persisting and loading state machines.
4
5use bitcoin::XOnlyPublicKey;
6
7use super::{wrapper::XOnlyPublicKeyDB, Database, DatabaseTransaction};
8use crate::execute_query_with_tx;
9use clementine_errors::BridgeError;
10
11impl Database {
12    /// Saves state machines to the database with the current block height
13    ///
14    /// # Arguments
15    ///
16    /// * `tx` - Optional database transaction
17    /// * `kickoff_machines` - Vector of (state_json, kickoff_id, owner_type) tuples for kickoff machines
18    /// * `round_machines` - Vector of (state_json, operator_xonly_pk, owner_type) tuples for round machines
19    /// * `block_height` - Current block height
20    /// * `last_processed_lcp` - Optional last processed LCP height
21    ///
22    /// # Errors
23    ///
24    /// Returns a `BridgeError` if the database operation fails
25    pub async fn save_state_machines(
26        &self,
27        tx: DatabaseTransaction<'_>,
28        kickoff_machines: Vec<(String, String)>,
29        round_machines: Vec<(String, XOnlyPublicKey)>,
30        block_height: i32,
31        owner_type: &str,
32        last_processed_lcp: Option<i32>,
33    ) -> Result<(), BridgeError> {
34        // Save kickoff machines that are dirty
35        for (state_json, kickoff_id) in kickoff_machines {
36            let query = sqlx::query(
37                "INSERT INTO state_machines (
38                    machine_type,
39                    state_json,
40                    kickoff_id,
41                    owner_type,
42                    block_height,
43                    created_at,
44                    updated_at
45                ) VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
46                ON CONFLICT (machine_type, kickoff_id, owner_type)
47                DO UPDATE SET
48                    state_json = EXCLUDED.state_json,
49                    block_height = EXCLUDED.block_height,
50                    updated_at = NOW()",
51            )
52            .bind("kickoff")
53            .bind(&state_json)
54            .bind(kickoff_id)
55            .bind(owner_type)
56            .bind(block_height);
57
58            query.execute(&mut **tx).await?;
59        }
60
61        // Save round machines that are dirty
62        for (state_json, operator_xonly_pk) in round_machines {
63            let query = sqlx::query(
64                "INSERT INTO state_machines (
65                    machine_type,
66                    state_json,
67                    operator_xonly_pk,
68                    owner_type,
69                    block_height,
70                    created_at,
71                    updated_at
72                ) VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
73                ON CONFLICT (machine_type, operator_xonly_pk, owner_type)
74                DO UPDATE SET
75                    state_json = EXCLUDED.state_json,
76                    block_height = EXCLUDED.block_height,
77                    updated_at = NOW()",
78            )
79            .bind("round")
80            .bind(&state_json)
81            .bind(XOnlyPublicKeyDB(operator_xonly_pk))
82            .bind(owner_type)
83            .bind(block_height);
84
85            query.execute(&mut **tx).await?;
86        }
87
88        // Update state manager status
89        let query = sqlx::query(
90            "INSERT INTO state_manager_status (
91                owner_type,
92                next_height_to_process,
93                last_processed_lcp,
94                updated_at
95            ) VALUES ($1, $2, $3, NOW())
96            ON CONFLICT (owner_type)
97            DO UPDATE SET
98                next_height_to_process = EXCLUDED.next_height_to_process,
99                last_processed_lcp = EXCLUDED.last_processed_lcp,
100                updated_at = NOW()",
101        )
102        .bind(owner_type)
103        .bind(block_height)
104        .bind(last_processed_lcp);
105
106        query.execute(&mut **tx).await?;
107
108        Ok(())
109    }
110
111    /// Gets the last processed block height
112    ///
113    /// # Arguments
114    ///
115    /// * `tx` - Optional database transaction
116    ///
117    /// # Errors
118    ///
119    /// Returns a `BridgeError` if the database operation fails
120    pub async fn get_next_height_to_process(
121        &self,
122        tx: Option<DatabaseTransaction<'_>>,
123        owner_type: &str,
124    ) -> Result<Option<i32>, BridgeError> {
125        let query = sqlx::query_as(
126            "SELECT next_height_to_process FROM state_manager_status WHERE owner_type = $1",
127        )
128        .bind(owner_type);
129
130        let result: Option<(i32,)> =
131            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
132
133        Ok(result.map(|(height,)| height))
134    }
135
136    /// Gets the last processed LCP height
137    ///
138    /// # Arguments
139    ///
140    /// * `tx` - Optional database transaction
141    /// * `owner_type` - The owner type to filter by
142    ///
143    /// # Errors
144    ///
145    /// Returns a `BridgeError` if the database operation fails
146    pub async fn get_last_processed_lcp(
147        &self,
148        tx: Option<DatabaseTransaction<'_>>,
149        owner_type: &str,
150    ) -> Result<Option<i32>, BridgeError> {
151        let query = sqlx::query_as(
152            "SELECT last_processed_lcp FROM state_manager_status WHERE owner_type = $1",
153        )
154        .bind(owner_type);
155
156        let result: Option<(Option<i32>,)> =
157            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
158
159        Ok(result.and_then(|(lcp,)| lcp))
160    }
161
162    /// Loads kickoff machines from the database
163    ///
164    /// # Arguments
165    ///
166    /// * `tx` - Optional database transaction
167    /// * `owner_type` - The owner type to filter by
168    ///
169    /// # Errors
170    ///
171    /// Returns a `BridgeError` if the database operation fails
172    pub async fn load_kickoff_machines(
173        &self,
174        tx: Option<DatabaseTransaction<'_>>,
175        owner_type: &str,
176    ) -> Result<Vec<(String, String, i32)>, BridgeError> {
177        let query = sqlx::query_as(
178            "SELECT
179                state_json,
180                kickoff_id,
181                block_height
182            FROM state_machines
183            WHERE machine_type = 'kickoff' AND owner_type = $1",
184        )
185        .bind(owner_type);
186
187        let results = execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
188
189        Ok(results)
190    }
191
192    /// Loads round machines from the database
193    ///
194    /// # Arguments
195    ///
196    /// * `tx` - Optional database transaction
197    /// * `owner_type` - The owner type to filter by
198    ///
199    /// # Errors
200    ///
201    /// Returns a `BridgeError` if the database operation fails
202    pub async fn load_round_machines(
203        &self,
204        tx: Option<DatabaseTransaction<'_>>,
205        owner_type: &str,
206    ) -> Result<Vec<(String, XOnlyPublicKey, i32)>, BridgeError> {
207        let query = sqlx::query_as(
208            "SELECT
209                state_json,
210                operator_xonly_pk,
211                block_height
212            FROM state_machines
213            WHERE machine_type = 'round' AND owner_type = $1",
214        )
215        .bind(owner_type);
216
217        let results: Vec<(String, XOnlyPublicKeyDB, i32)> =
218            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
219
220        Ok(results
221            .into_iter()
222            .map(|(state_json, operator_xonly_pk, block_height)| {
223                (state_json, operator_xonly_pk.0, block_height)
224            })
225            .collect())
226    }
227
228    /// Checks if a pgmq queue exists by querying the pgmq.meta table.
229    ///
230    /// # Arguments
231    ///
232    /// * `queue_name` - The name of the queue to check
233    /// * `tx` - Optional database transaction
234    ///
235    /// # Errors
236    ///
237    /// Returns a [`BridgeError`] if the database query fails.
238    pub async fn pgmq_queue_exists(
239        &self,
240        queue_name: &str,
241        tx: Option<DatabaseTransaction<'_>>,
242    ) -> Result<bool, BridgeError> {
243        let query = sqlx::query_as::<_, (bool,)>(
244            "SELECT EXISTS(SELECT 1 FROM pgmq.meta WHERE queue_name = $1)",
245        )
246        .bind(queue_name);
247
248        let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
249
250        Ok(result.0)
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use crate::test::common::*;
258
259    #[tokio::test]
260    async fn test_save_and_load_state_machines() {
261        let config = create_test_config_with_thread_name().await;
262        let db = Database::new(&config).await.unwrap();
263
264        let xonly_pk1 = generate_random_xonly_pk();
265        let xonly_pk2 = generate_random_xonly_pk();
266
267        // Create test data with owner_type
268        let owner_type = "test_owner";
269        let kickoff_machines = vec![
270            ("kickoff_state_1".to_string(), "kickoff_id_1".to_string()),
271            ("kickoff_state_2".to_string(), "kickoff_id_2".to_string()),
272        ];
273
274        let round_machines = vec![
275            ("round_state_1".to_string(), xonly_pk1),
276            ("round_state_2".to_string(), xonly_pk2),
277        ];
278
279        let mut dbtx = db.begin_transaction().await.unwrap();
280        // Save state machines
281        db.save_state_machines(
282            &mut dbtx,
283            kickoff_machines.clone(),
284            round_machines.clone(),
285            123,
286            owner_type,
287            Some(1234),
288        )
289        .await
290        .unwrap();
291        dbtx.commit().await.unwrap();
292
293        // Check last processed block height
294        let block_height = db
295            .get_next_height_to_process(None, owner_type)
296            .await
297            .unwrap();
298        assert_eq!(block_height, Some(123));
299
300        let last_processed_lcp = db.get_last_processed_lcp(None, owner_type).await.unwrap();
301        assert_eq!(last_processed_lcp, Some(1234));
302
303        // Load kickoff machines
304        let loaded_kickoff = db.load_kickoff_machines(None, owner_type).await.unwrap();
305        assert_eq!(loaded_kickoff.len(), 2);
306        assert_eq!(loaded_kickoff[0].0, "kickoff_state_1");
307        assert_eq!(loaded_kickoff[0].1, "kickoff_id_1");
308        assert_eq!(loaded_kickoff[0].2, 123);
309
310        // Load round machines
311        let loaded_round = db.load_round_machines(None, owner_type).await.unwrap();
312        assert_eq!(loaded_round.len(), 2);
313        assert_eq!(loaded_round[0].0, "round_state_1");
314        assert_eq!(loaded_round[0].1, xonly_pk1);
315        assert_eq!(loaded_round[0].2, 123);
316    }
317}