clementine_core/database/
mod.rs

1//! # Database Operations
2//!
3//! Database crate provides functions that adds/reads values from PostgreSQL
4//! database.
5//!
6//! **Warning:** This crate won't configure PostgreSQL itself and excepts admin
7//! privileges to create/drop databases.
8
9use std::time::Duration;
10
11use crate::{config::BridgeConfig, errors::BridgeError};
12use eyre::Context;
13use secrecy::ExposeSecret;
14use sqlx::migrate::Migrator;
15use sqlx::postgres::PgConnectOptions;
16use sqlx::ConnectOptions;
17use sqlx::{Pool, Postgres};
18
19mod aggregator;
20mod bitcoin_syncer;
21mod header_chain_prover;
22mod operator;
23#[cfg(feature = "automation")]
24mod state_machine;
25#[cfg(all(test, feature = "automation"))]
26mod test;
27#[cfg(feature = "automation")]
28mod tx_sender;
29mod verifier;
30mod wrapper;
31
32#[cfg(test)]
33pub use wrapper::*;
34
35/// Assuming total of 100 max connections as default, the verifier and operator
36/// can share 50 each.
37pub const MAX_CONNECTIONS_FOR_POOL: u32 = 50;
38
39/// PostgreSQL database connection details.
40#[derive(Clone, Debug)]
41pub struct Database {
42    connection: Pool<Postgres>,
43}
44
45/// Database transaction for Postgres.
46pub type DatabaseTransaction<'a, 'b> = &'a mut sqlx::Transaction<'b, Postgres>;
47
48/// Executes a query with a transaction if it is provided.
49///
50/// # Parameters
51///
52/// - `$conn`: Database connection.
53/// - `$tx`: Optional database transaction
54/// - `$query`: Query to execute.
55/// - `$method`: Method to execute on the query.
56#[macro_export]
57macro_rules! execute_query_with_tx {
58    ($conn:expr, $tx:expr, $query:expr, $method:ident) => {
59        match $tx {
60            Some(tx) => $query.$method(&mut **tx).await,
61            None => $query.$method(&$conn).await,
62        }
63    };
64}
65
66impl Database {
67    /// Establishes a new connection to a PostgreSQL database with given
68    /// configuration.
69    ///
70    /// # Errors
71    ///
72    /// Returns a [`BridgeError`] if database is not accessible.
73    pub async fn new(config: &BridgeConfig) -> Result<Self, BridgeError> {
74        let mut opt = PgConnectOptions::default();
75        opt = opt.host(config.db_host.as_str());
76        opt = opt.port(
77            u16::try_from(config.db_port).wrap_err("Failed to convert database port to u16")?,
78        );
79        opt = opt.username(config.db_user.expose_secret());
80        opt = opt.password(config.db_password.expose_secret());
81        opt = opt.database(config.db_name.as_str());
82        // Change default sqlx warnings from Warn to Debug
83        // These logs really clutter our CI logs, and they were never useful.
84        // But in the future if we fix slow statements (if they are actually a problem?), we can revert this.
85        opt = opt.log_slow_statements(log::LevelFilter::Debug, Duration::from_secs(3));
86
87        let opts = sqlx::postgres::PgPoolOptions::new()
88            .acquire_slow_level(log::LevelFilter::Debug)
89            .max_connections(MAX_CONNECTIONS_FOR_POOL);
90
91        #[cfg(test)]
92        let opts = if config.test_params.timeout_params.any_timeout() {
93            // increase timeout for pool connections beyond any other to avoid flakiness
94            opts.acquire_timeout(Duration::from_secs(10000))
95                .acquire_slow_threshold(Duration::from_secs(10000))
96        } else {
97            opts
98        };
99
100        let connection = opts
101            .connect_with(opt)
102            .await
103            .map_err(BridgeError::DatabaseError)?;
104
105        Ok(Self { connection })
106    }
107
108    /// Closes database connection.
109    pub async fn close(&self) {
110        self.connection.close().await;
111    }
112
113    pub fn get_pool(&self) -> Pool<Postgres> {
114        self.connection.clone()
115    }
116
117    pub async fn is_pgmq_installed(
118        &self,
119        tx: Option<DatabaseTransaction<'_, '_>>,
120    ) -> Result<bool, BridgeError> {
121        let query = sqlx::query_as::<_, (i64,)>(
122            "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'pgmq' AND table_name = 'meta'"
123        );
124
125        let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
126
127        Ok(result.0 > 0)
128    }
129
130    /// Runs the schema script on a database for the given configuration.
131    ///
132    /// # Errors
133    ///
134    /// Will return [`BridgeError`] if there was a problem with database
135    /// connection.
136    pub async fn run_schema_script(
137        config: &BridgeConfig,
138        is_verifier: bool,
139    ) -> Result<(), BridgeError> {
140        let database = Database::new(config).await?;
141
142        sqlx::raw_sql(include_str!("schema.sql"))
143            .execute(&database.connection)
144            .await?;
145        if is_verifier {
146            // Check if PGMQ schema already exists
147            let is_pgmq_installed = database.is_pgmq_installed(None).await?;
148
149            // Only execute PGMQ setup if it doesn't exist
150            if !is_pgmq_installed {
151                sqlx::raw_sql(include_str!("pgmq.sql"))
152                    .execute(&database.connection)
153                    .await?;
154            }
155        }
156
157        // Apply embedded SQLx migrations from the migrations folder
158        // This looks for SQL files under core/src/database/migrations at compile time
159        // SQL files in migration folder will be applied in lexicographical order of their filenames
160        static MIGRATOR: Migrator = sqlx::migrate!("src/database/migrations");
161
162        MIGRATOR
163            .run(&database.connection)
164            .await
165            .wrap_err("Failed to run migrations")?;
166
167        database.close().await;
168        Ok(())
169    }
170
171    /// Starts a database transaction.
172    ///
173    /// Return value can be used for committing changes. If not committed,
174    /// database will rollback every operation done after that call.
175    pub async fn begin_transaction(
176        &self,
177    ) -> Result<sqlx::Transaction<'static, sqlx::Postgres>, BridgeError> {
178        Ok(self.connection.begin().await?)
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use crate::test::common::*;
185    use crate::{config::BridgeConfig, database::Database};
186
187    #[tokio::test]
188    async fn valid_database_connection() {
189        let config = create_test_config_with_thread_name().await;
190
191        Database::new(&config).await.unwrap();
192    }
193
194    #[tokio::test]
195    #[should_panic]
196    async fn invalid_database_connection() {
197        let mut config = BridgeConfig::new();
198        config.db_host = "nonexistinghost".to_string();
199        config.db_name = "nonexistingpassword".to_string();
200        config.db_user = "nonexistinguser".to_string().into();
201        config.db_password = "nonexistingpassword".to_string().into();
202        config.db_port = 123;
203
204        Database::new(&config).await.unwrap();
205    }
206}