clementine_core/database/
mod.rs

1//! # Database Operations
2//!
3//! Database crate provides functions that adds/reads values from PostgreSQL
4//! database.
5//!
6//! **Warning:** This crate won't configure PostgreSQL itself and excepts admin
7//! privileges to create/drop databases.
8
9use std::time::Duration;
10
11use crate::config::BridgeConfig;
12use clementine_errors::BridgeError;
13use eyre::Context;
14use secrecy::ExposeSecret;
15use sqlx::migrate::Migrator;
16use sqlx::postgres::PgConnectOptions;
17use sqlx::ConnectOptions;
18use sqlx::{Pool, Postgres};
19
20mod aggregator;
21mod bitcoin_syncer;
22mod header_chain_prover;
23mod operator;
24#[cfg(feature = "automation")]
25mod state_machine;
26#[cfg(all(test, feature = "automation"))]
27mod test;
28#[cfg(feature = "automation")]
29mod tx_sender;
30mod verifier;
31mod wrapper;
32
33#[cfg(test)]
34pub use wrapper::*;
35
36/// Assuming total of 100 max connections as default, the verifier and operator
37/// can share 50 each.
38pub const MAX_CONNECTIONS_FOR_POOL: u32 = 50;
39
40/// PostgreSQL database connection details.
41#[derive(Clone, Debug)]
42pub struct Database {
43    connection: Pool<Postgres>,
44}
45
46/// Database transaction for Postgres.
47pub type DatabaseTransaction<'a> = &'a mut sqlx::Transaction<'static, Postgres>;
48
49/// Executes a query with a transaction if it is provided.
50///
51/// # Parameters
52///
53/// - `$conn`: Database connection.
54/// - `$tx`: Optional database transaction
55/// - `$query`: Query to execute.
56/// - `$method`: Method to execute on the query.
57#[macro_export]
58macro_rules! execute_query_with_tx {
59    ($conn:expr, $tx:expr, $query:expr, $method:ident) => {
60        match $tx {
61            Some(tx) => $query.$method(&mut **tx).await,
62            None => $query.$method(&$conn).await,
63        }
64    };
65}
66
67impl Database {
68    /// Establishes a new connection to a PostgreSQL database with given
69    /// configuration.
70    ///
71    /// # Errors
72    ///
73    /// Returns a [`BridgeError`] if database is not accessible.
74    pub async fn new(config: &BridgeConfig) -> Result<Self, BridgeError> {
75        let mut opt = PgConnectOptions::default();
76        opt = opt.host(config.db_host.as_str());
77        opt = opt.port(
78            u16::try_from(config.db_port).wrap_err("Failed to convert database port to u16")?,
79        );
80        opt = opt.username(config.db_user.expose_secret());
81        opt = opt.password(config.db_password.expose_secret());
82        opt = opt.database(config.db_name.as_str());
83        // Change default sqlx warnings from Warn to Debug
84        // These logs really clutter our CI logs, and they were never useful.
85        // But in the future if we fix slow statements (if they are actually a problem?), we can revert this.
86        opt = opt.log_slow_statements(log::LevelFilter::Debug, Duration::from_secs(3));
87
88        let opts = sqlx::postgres::PgPoolOptions::new()
89            .acquire_slow_level(log::LevelFilter::Debug)
90            .max_connections(MAX_CONNECTIONS_FOR_POOL);
91
92        #[cfg(test)]
93        let opts = if config.test_params.timeout_params.any_timeout() {
94            // increase timeout for pool connections beyond any other to avoid flakiness
95            opts.acquire_timeout(Duration::from_secs(10000))
96                .acquire_slow_threshold(Duration::from_secs(10000))
97        } else {
98            opts
99        };
100
101        let connection = opts
102            .connect_with(opt)
103            .await
104            .map_err(BridgeError::DatabaseError)?;
105
106        Ok(Self { connection })
107    }
108
109    /// Closes database connection.
110    pub async fn close(&self) {
111        self.connection.close().await;
112    }
113
114    pub fn get_pool(&self) -> Pool<Postgres> {
115        self.connection.clone()
116    }
117
118    pub async fn is_pgmq_installed(
119        &self,
120        tx: Option<DatabaseTransaction<'_>>,
121    ) -> Result<bool, BridgeError> {
122        let query = sqlx::query_as::<_, (i64,)>(
123            "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'pgmq' AND table_name = 'meta'"
124        );
125
126        let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
127
128        Ok(result.0 > 0)
129    }
130
131    /// Runs the schema script on a database for the given configuration.
132    ///
133    /// # Errors
134    ///
135    /// Will return [`BridgeError`] if there was a problem with database
136    /// connection.
137    pub async fn run_schema_script(
138        config: &BridgeConfig,
139        is_verifier: bool,
140    ) -> Result<(), BridgeError> {
141        let database = Database::new(config).await?;
142
143        sqlx::raw_sql(include_str!("schema.sql"))
144            .execute(&database.connection)
145            .await?;
146        if is_verifier {
147            // Check if PGMQ schema already exists
148            let is_pgmq_installed = database.is_pgmq_installed(None).await?;
149
150            // Only execute PGMQ setup if it doesn't exist
151            if !is_pgmq_installed {
152                sqlx::raw_sql(include_str!("pgmq.sql"))
153                    .execute(&database.connection)
154                    .await?;
155            }
156        }
157
158        // Apply embedded SQLx migrations from the migrations folder
159        // This looks for SQL files under core/src/database/migrations at compile time
160        // SQL files in migration folder will be applied in lexicographical order of their filenames
161        static MIGRATOR: Migrator = sqlx::migrate!("src/database/migrations");
162
163        MIGRATOR
164            .run(&database.connection)
165            .await
166            .wrap_err("Failed to run migrations")?;
167
168        database.close().await;
169        Ok(())
170    }
171
172    /// Starts a database transaction.
173    ///
174    /// Return value can be used for committing changes. If not committed,
175    /// database will rollback every operation done after that call.
176    pub async fn begin_transaction(
177        &self,
178    ) -> Result<sqlx::Transaction<'static, sqlx::Postgres>, BridgeError> {
179        Ok(self.connection.begin().await?)
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use crate::test::common::*;
186    use crate::{config::BridgeConfig, database::Database};
187
188    #[tokio::test]
189    async fn valid_database_connection() {
190        let config = create_test_config_with_thread_name().await;
191
192        Database::new(&config).await.unwrap();
193    }
194
195    #[tokio::test]
196    #[should_panic]
197    async fn invalid_database_connection() {
198        let mut config = BridgeConfig::new();
199        config.db_host = "nonexistinghost".to_string();
200        config.db_name = "nonexistingpassword".to_string();
201        config.db_user = "nonexistinguser".to_string().into();
202        config.db_password = "nonexistingpassword".to_string().into();
203        config.db_port = 123;
204
205        Database::new(&config).await.unwrap();
206    }
207}