clementine_core/database/
state_machine.rs

1//! # State Machine Related Database Operations
2//!
3//! This module includes database functions for persisting and loading state machines.
4
5use bitcoin::XOnlyPublicKey;
6
7use super::{wrapper::XOnlyPublicKeyDB, Database, DatabaseTransaction};
8use crate::execute_query_with_tx;
9use clementine_errors::BridgeError;
10
11impl Database {
12    /// Saves state machines to the database with the current block height
13    ///
14    /// # Arguments
15    ///
16    /// * `tx` - Optional database transaction
17    /// * `kickoff_machines` - Vector of (state_json, kickoff_id, owner_type) tuples for kickoff machines
18    /// * `round_machines` - Vector of (state_json, operator_xonly_pk, owner_type) tuples for round machines
19    /// * `block_height` - Current block height
20    ///
21    /// # Errors
22    ///
23    /// Returns a `BridgeError` if the database operation fails
24    pub async fn save_state_machines(
25        &self,
26        tx: DatabaseTransaction<'_>,
27        kickoff_machines: Vec<(String, String)>,
28        round_machines: Vec<(String, XOnlyPublicKey)>,
29        block_height: i32,
30        owner_type: &str,
31    ) -> Result<(), BridgeError> {
32        // Save kickoff machines that are dirty
33        for (state_json, kickoff_id) in kickoff_machines {
34            let query = sqlx::query(
35                "INSERT INTO state_machines (
36                    machine_type,
37                    state_json,
38                    kickoff_id,
39                    owner_type,
40                    block_height,
41                    created_at,
42                    updated_at
43                ) VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
44                ON CONFLICT (machine_type, kickoff_id, owner_type)
45                DO UPDATE SET
46                    state_json = EXCLUDED.state_json,
47                    block_height = EXCLUDED.block_height,
48                    updated_at = NOW()",
49            )
50            .bind("kickoff")
51            .bind(&state_json)
52            .bind(kickoff_id)
53            .bind(owner_type)
54            .bind(block_height);
55
56            query.execute(&mut **tx).await?;
57        }
58
59        // Save round machines that are dirty
60        for (state_json, operator_xonly_pk) in round_machines {
61            let query = sqlx::query(
62                "INSERT INTO state_machines (
63                    machine_type,
64                    state_json,
65                    operator_xonly_pk,
66                    owner_type,
67                    block_height,
68                    created_at,
69                    updated_at
70                ) VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
71                ON CONFLICT (machine_type, operator_xonly_pk, owner_type)
72                DO UPDATE SET
73                    state_json = EXCLUDED.state_json,
74                    block_height = EXCLUDED.block_height,
75                    updated_at = NOW()",
76            )
77            .bind("round")
78            .bind(&state_json)
79            .bind(XOnlyPublicKeyDB(operator_xonly_pk))
80            .bind(owner_type)
81            .bind(block_height);
82
83            query.execute(&mut **tx).await?;
84        }
85
86        // Update state manager status
87        let query = sqlx::query(
88            "INSERT INTO state_manager_status (
89                owner_type,
90                next_height_to_process,
91                updated_at
92            ) VALUES ($1, $2, NOW())
93            ON CONFLICT (owner_type)
94            DO UPDATE SET
95                next_height_to_process = EXCLUDED.next_height_to_process,
96                updated_at = NOW()",
97        )
98        .bind(owner_type)
99        .bind(block_height);
100
101        query.execute(&mut **tx).await?;
102
103        Ok(())
104    }
105
106    /// Gets the last processed block height
107    ///
108    /// # Arguments
109    ///
110    /// * `tx` - Optional database transaction
111    ///
112    /// # Errors
113    ///
114    /// Returns a `BridgeError` if the database operation fails
115    pub async fn get_next_height_to_process(
116        &self,
117        tx: Option<DatabaseTransaction<'_>>,
118        owner_type: &str,
119    ) -> Result<Option<i32>, BridgeError> {
120        let query = sqlx::query_as(
121            "SELECT next_height_to_process FROM state_manager_status WHERE owner_type = $1",
122        )
123        .bind(owner_type);
124
125        let result: Option<(i32,)> =
126            execute_query_with_tx!(self.connection, tx, query, fetch_optional)?;
127
128        Ok(result.map(|(height,)| height))
129    }
130
131    /// Loads kickoff machines from the database
132    ///
133    /// # Arguments
134    ///
135    /// * `tx` - Optional database transaction
136    /// * `owner_type` - The owner type to filter by
137    ///
138    /// # Errors
139    ///
140    /// Returns a `BridgeError` if the database operation fails
141    pub async fn load_kickoff_machines(
142        &self,
143        tx: Option<DatabaseTransaction<'_>>,
144        owner_type: &str,
145    ) -> Result<Vec<(String, String, i32)>, BridgeError> {
146        let query = sqlx::query_as(
147            "SELECT
148                state_json,
149                kickoff_id,
150                block_height
151            FROM state_machines
152            WHERE machine_type = 'kickoff' AND owner_type = $1",
153        )
154        .bind(owner_type);
155
156        let results = execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
157
158        Ok(results)
159    }
160
161    /// Loads round machines from the database
162    ///
163    /// # Arguments
164    ///
165    /// * `tx` - Optional database transaction
166    /// * `owner_type` - The owner type to filter by
167    ///
168    /// # Errors
169    ///
170    /// Returns a `BridgeError` if the database operation fails
171    pub async fn load_round_machines(
172        &self,
173        tx: Option<DatabaseTransaction<'_>>,
174        owner_type: &str,
175    ) -> Result<Vec<(String, XOnlyPublicKey, i32)>, BridgeError> {
176        let query = sqlx::query_as(
177            "SELECT
178                state_json,
179                operator_xonly_pk,
180                block_height
181            FROM state_machines
182            WHERE machine_type = 'round' AND owner_type = $1",
183        )
184        .bind(owner_type);
185
186        let results: Vec<(String, XOnlyPublicKeyDB, i32)> =
187            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;
188
189        Ok(results
190            .into_iter()
191            .map(|(state_json, operator_xonly_pk, block_height)| {
192                (state_json, operator_xonly_pk.0, block_height)
193            })
194            .collect())
195    }
196
197    /// Checks if a pgmq queue exists by querying the pgmq.meta table.
198    ///
199    /// # Arguments
200    ///
201    /// * `queue_name` - The name of the queue to check
202    /// * `tx` - Optional database transaction
203    ///
204    /// # Errors
205    ///
206    /// Returns a [`BridgeError`] if the database query fails.
207    pub async fn pgmq_queue_exists(
208        &self,
209        queue_name: &str,
210        tx: Option<DatabaseTransaction<'_>>,
211    ) -> Result<bool, BridgeError> {
212        let query = sqlx::query_as::<_, (bool,)>(
213            "SELECT EXISTS(SELECT 1 FROM pgmq.meta WHERE queue_name = $1)",
214        )
215        .bind(queue_name);
216
217        let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
218
219        Ok(result.0)
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use crate::test::common::*;
227
228    #[tokio::test]
229    async fn test_save_and_load_state_machines() {
230        let config = create_test_config_with_thread_name().await;
231        let db = Database::new(&config).await.unwrap();
232
233        let xonly_pk1 = generate_random_xonly_pk();
234        let xonly_pk2 = generate_random_xonly_pk();
235
236        // Create test data with owner_type
237        let owner_type = "test_owner";
238        let kickoff_machines = vec![
239            ("kickoff_state_1".to_string(), "kickoff_id_1".to_string()),
240            ("kickoff_state_2".to_string(), "kickoff_id_2".to_string()),
241        ];
242
243        let round_machines = vec![
244            ("round_state_1".to_string(), xonly_pk1),
245            ("round_state_2".to_string(), xonly_pk2),
246        ];
247
248        let mut dbtx = db.begin_transaction().await.unwrap();
249        // Save state machines
250        db.save_state_machines(
251            &mut dbtx,
252            kickoff_machines.clone(),
253            round_machines.clone(),
254            123,
255            owner_type,
256        )
257        .await
258        .unwrap();
259        dbtx.commit().await.unwrap();
260
261        // Check last processed block height
262        let block_height = db
263            .get_next_height_to_process(None, owner_type)
264            .await
265            .unwrap();
266        assert_eq!(block_height, Some(123));
267
268        // Load kickoff machines
269        let loaded_kickoff = db.load_kickoff_machines(None, owner_type).await.unwrap();
270        assert_eq!(loaded_kickoff.len(), 2);
271        assert_eq!(loaded_kickoff[0].0, "kickoff_state_1");
272        assert_eq!(loaded_kickoff[0].1, "kickoff_id_1");
273        assert_eq!(loaded_kickoff[0].2, 123);
274
275        // Load round machines
276        let loaded_round = db.load_round_machines(None, owner_type).await.unwrap();
277        assert_eq!(loaded_round.len(), 2);
278        assert_eq!(loaded_round[0].0, "round_state_1");
279        assert_eq!(loaded_round[0].1, xonly_pk1);
280        assert_eq!(loaded_round[0].2, 123);
281    }
282}