clementine_core/database/
mod.rs

1//! # Database Operations
2//!
3//! Database crate provides functions that adds/reads values from PostgreSQL
4//! database.
5//!
6//! **Warning:** This crate won't configure PostgreSQL itself and excepts admin
7//! privileges to create/drop databases.
8
9use std::time::Duration;
10
11use crate::config::BridgeConfig;
12use clementine_errors::BridgeError;
13use eyre::Context;
14use secrecy::ExposeSecret;
15use sqlx::migrate::Migrator;
16use sqlx::postgres::PgConnectOptions;
17use sqlx::ConnectOptions;
18use sqlx::{Pool, Postgres};
19
20mod aggregator;
21mod bitcoin_syncer;
22mod header_chain_prover;
23mod operator;
24#[cfg(feature = "automation")]
25mod state_machine;
26#[cfg(all(test, feature = "automation"))]
27mod test;
28mod verifier;
29mod wrapper;
30
31#[cfg(test)]
32pub use wrapper::*;
33
34/// Assuming total of 100 max connections as default, the verifier and operator
35/// can share 50 each.
36pub const MAX_CONNECTIONS_FOR_POOL: u32 = 50;
37
38/// PostgreSQL database connection details.
39#[derive(Clone, Debug)]
40pub struct Database {
41    connection: Pool<Postgres>,
42}
43
44/// Database transaction for Postgres.
45pub type DatabaseTransaction<'a> = &'a mut sqlx::Transaction<'static, Postgres>;
46
47/// Executes a query with a transaction if it is provided.
48///
49/// # Parameters
50///
51/// - `$conn`: Database connection.
52/// - `$tx`: Optional database transaction
53/// - `$query`: Query to execute.
54/// - `$method`: Method to execute on the query.
55#[macro_export]
56macro_rules! execute_query_with_tx {
57    ($conn:expr, $tx:expr, $query:expr, $method:ident) => {
58        match $tx {
59            Some(tx) => $query.$method(&mut **tx).await,
60            None => $query.$method(&$conn).await,
61        }
62    };
63}
64
65impl Database {
66    /// Establishes a new connection to a PostgreSQL database with given
67    /// configuration.
68    ///
69    /// # Errors
70    ///
71    /// Returns a [`BridgeError`] if database is not accessible.
72    pub async fn new(config: &BridgeConfig) -> Result<Self, BridgeError> {
73        let mut opt = PgConnectOptions::default();
74        opt = opt.host(config.db_host.as_str());
75        opt = opt.port(config.db_port);
76        opt = opt.username(config.db_user.expose_secret());
77        opt = opt.password(config.db_password.expose_secret());
78        opt = opt.database(config.db_name.as_str());
79        // Change default sqlx warnings from Warn to Debug
80        // These logs really clutter our CI logs, and they were never useful.
81        // But in the future if we fix slow statements (if they are actually a problem?), we can revert this.
82        opt = opt.log_slow_statements(log::LevelFilter::Debug, Duration::from_secs(3));
83
84        let opts = sqlx::postgres::PgPoolOptions::new()
85            .acquire_slow_level(log::LevelFilter::Debug)
86            .max_connections(MAX_CONNECTIONS_FOR_POOL);
87
88        #[cfg(test)]
89        let opts = if config.test_params.timeout_params.any_timeout() {
90            // increase timeout for pool connections beyond any other to avoid flakiness
91            opts.acquire_timeout(Duration::from_secs(10000))
92                .acquire_slow_threshold(Duration::from_secs(10000))
93        } else {
94            opts
95        };
96
97        let connection = opts
98            .connect_with(opt)
99            .await
100            .map_err(BridgeError::DatabaseError)?;
101
102        Ok(Self { connection })
103    }
104
105    /// Closes database connection.
106    pub async fn close(&self) {
107        self.connection.close().await;
108    }
109
110    pub fn get_pool(&self) -> Pool<Postgres> {
111        self.connection.clone()
112    }
113
114    pub async fn is_pgmq_installed(
115        &self,
116        tx: Option<DatabaseTransaction<'_>>,
117    ) -> Result<bool, BridgeError> {
118        let query = sqlx::query_as::<_, (i64,)>(
119            "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'pgmq' AND table_name = 'meta'"
120        );
121
122        let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
123
124        Ok(result.0 > 0)
125    }
126
127    /// Runs the schema script on a database for the given configuration.
128    ///
129    /// # Errors
130    ///
131    /// Will return [`BridgeError`] if there was a problem with database
132    /// connection.
133    pub async fn run_schema_script(
134        config: &BridgeConfig,
135        is_verifier: bool,
136    ) -> Result<(), BridgeError> {
137        let database = Database::new(config).await?;
138
139        sqlx::raw_sql(include_str!("schema.sql"))
140            .execute(&database.connection)
141            .await?;
142        if is_verifier {
143            // Check if PGMQ schema already exists
144            let is_pgmq_installed = database.is_pgmq_installed(None).await?;
145
146            // Only execute PGMQ setup if it doesn't exist
147            if !is_pgmq_installed {
148                sqlx::raw_sql(include_str!("pgmq.sql"))
149                    .execute(&database.connection)
150                    .await?;
151            }
152        }
153
154        // Apply embedded SQLx migrations from the migrations folder
155        // This looks for SQL files under core/src/database/migrations at compile time
156        // SQL files in migration folder will be applied in lexicographical order of their filenames
157        static MIGRATOR: Migrator = sqlx::migrate!("src/database/migrations");
158
159        MIGRATOR
160            .run(&database.connection)
161            .await
162            .wrap_err("Failed to run migrations")?;
163
164        database.close().await;
165        Ok(())
166    }
167
168    /// Starts a database transaction.
169    ///
170    /// Return value can be used for committing changes. If not committed,
171    /// database will rollback every operation done after that call.
172    pub async fn begin_transaction(
173        &self,
174    ) -> Result<sqlx::Transaction<'static, sqlx::Postgres>, BridgeError> {
175        Ok(self.connection.begin().await?)
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use crate::test::common::*;
182    use crate::{config::BridgeConfig, database::Database};
183
184    #[tokio::test]
185    async fn valid_database_connection() {
186        let config = create_test_config_with_thread_name().await;
187
188        Database::new(&config).await.unwrap();
189    }
190
191    #[tokio::test]
192    #[should_panic]
193    async fn invalid_database_connection() {
194        let mut config = BridgeConfig::new();
195        config.db_host = "nonexistinghost".to_string();
196        config.db_name = "nonexistingpassword".to_string();
197        config.db_user = "nonexistinguser".to_string().into();
198        config.db_password = "nonexistingpassword".to_string().into();
199        config.db_port = 123;
200
201        Database::new(&config).await.unwrap();
202    }
203}