clementine_core/database/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
//! # Database Operations
//!
//! Database crate provides functions that adds/reads values from PostgreSQL
//! database.
//!
//! **Warning:** This crate won't configure PostgreSQL itself and excepts admin
//! privileges to create/drop databases.

use std::time::Duration;

use crate::{config::BridgeConfig, errors::BridgeError};
use alloy::transports::http::reqwest::Url;
use eyre::Context;
use secrecy::ExposeSecret;
use sqlx::postgres::PgConnectOptions;
use sqlx::ConnectOptions;
use sqlx::{Pool, Postgres};

mod aggregator;
mod bitcoin_syncer;
mod header_chain_prover;
mod operator;
#[cfg(feature = "automation")]
mod state_machine;
#[cfg(feature = "automation")]
mod tx_sender;
mod verifier;
mod wrapper;

#[cfg(test)]
pub use wrapper::*;

/// PostgreSQL database connection details.
#[derive(Clone, Debug)]
pub struct Database {
    connection: Pool<Postgres>,
}

/// Database transaction for Postgres.
pub type DatabaseTransaction<'a, 'b> = &'a mut sqlx::Transaction<'b, Postgres>;

/// Executes a query with a transaction if it is provided.
///
/// # Parameters
///
/// - `$conn`: Database connection.
/// - `$tx`: Optional database transaction
/// - `$query`: Query to execute.
/// - `$method`: Method to execute on the query.
#[macro_export]
macro_rules! execute_query_with_tx {
    ($conn:expr, $tx:expr, $query:expr, $method:ident) => {
        match $tx {
            Some(tx) => $query.$method(&mut **tx).await,
            None => $query.$method(&$conn).await,
        }
    };
}

impl Database {
    /// Establishes a new connection to a PostgreSQL database with given
    /// configuration.
    ///
    /// # Errors
    ///
    /// Returns a [`BridgeError`] if database is not accessible.
    pub async fn new(config: &BridgeConfig) -> Result<Self, BridgeError> {
        let url = Database::get_postgresql_database_url(config);
        let url = Url::parse(&url).wrap_err("Failed to parse database URL")?;
        let mut opt = PgConnectOptions::from_url(&url).map_err(BridgeError::DatabaseError)?;
        opt = opt.log_slow_statements(log::LevelFilter::Debug, Duration::from_secs(3));

        let opts = sqlx::postgres::PgPoolOptions::new().acquire_slow_level(log::LevelFilter::Debug);

        #[cfg(test)]
        let opts = if config.test_params.timeout_params.any_timeout() {
            // increase timeout for pool connections beyond any other to avoid flakiness
            opts.acquire_timeout(Duration::from_secs(10000))
                .acquire_slow_threshold(Duration::from_secs(10000))
        } else {
            opts
        };

        let connection = opts
            .connect_with(opt)
            .await
            .map_err(BridgeError::DatabaseError)?;

        Ok(Self { connection })
    }

    /// Closes database connection.
    pub async fn close(&self) {
        self.connection.close().await;
    }

    pub fn get_pool(&self) -> Pool<Postgres> {
        self.connection.clone()
    }

    pub async fn is_pgmq_installed(
        &self,
        tx: Option<DatabaseTransaction<'_, '_>>,
    ) -> Result<bool, BridgeError> {
        let query = sqlx::query_as::<_, (i64,)>(
            "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'pgmq' AND table_name = 'meta'"
        );

        let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;

        Ok(result.0 > 0)
    }

    /// Runs the schema script on a database for the given configuration.
    ///
    /// # Errors
    ///
    /// Will return [`BridgeError`] if there was a problem with database
    /// connection.
    pub async fn run_schema_script(
        config: &BridgeConfig,
        is_verifier: bool,
    ) -> Result<(), BridgeError> {
        let database = Database::new(config).await?;

        sqlx::raw_sql(include_str!("schema.sql"))
            .execute(&database.connection)
            .await?;
        if is_verifier {
            // Check if PGMQ schema already exists
            let is_pgmq_installed = database.is_pgmq_installed(None).await?;

            // Only execute PGMQ setup if it doesn't exist
            if !is_pgmq_installed {
                sqlx::raw_sql(include_str!("pgmq.sql"))
                    .execute(&database.connection)
                    .await?;
            }
        }

        database.close().await;
        Ok(())
    }

    /// Prepares a valid PostgreSQL URL.
    ///
    /// URL contains user, password, host and port fields, which are picked from
    /// the given configuration.
    pub fn get_postgresql_url(config: &BridgeConfig) -> String {
        "postgresql://".to_owned()
            + &config.db_user.expose_secret()
            + ":"
            + &config.db_password.expose_secret()
            + "@"
            + &config.db_host
            + ":"
            + &config.db_port.to_string()
    }

    /// Prepares a valid PostgreSQL URL to a specific database.
    ///
    /// URL contains user, password, host, port and database name fields, which
    /// are picked from the given configuration.
    pub fn get_postgresql_database_url(config: &BridgeConfig) -> String {
        Database::get_postgresql_url(config) + "/" + &config.db_name
    }

    /// Starts a database transaction.
    ///
    /// Return value can be used for committing changes. If not committed,
    /// database will rollback every operation done after that call.
    pub async fn begin_transaction(
        &self,
    ) -> Result<sqlx::Transaction<'_, sqlx::Postgres>, BridgeError> {
        Ok(self.connection.begin().await?)
    }
}

#[cfg(test)]
mod tests {
    use crate::test::common::*;
    use crate::{config::BridgeConfig, database::Database};

    #[tokio::test]
    async fn valid_database_connection() {
        let config = create_test_config_with_thread_name().await;

        Database::new(&config).await.unwrap();
    }

    #[tokio::test]
    #[should_panic]
    async fn invalid_database_connection() {
        let mut config = BridgeConfig::new();
        config.db_host = "nonexistinghost".to_string();
        config.db_name = "nonexistingpassword".to_string();
        config.db_user = "nonexistinguser".to_string().into();
        config.db_password = "nonexistingpassword".to_string().into();
        config.db_port = 123;

        Database::new(&config).await.unwrap();
    }

    #[test]
    fn get_postgresql_url() {
        let mut config = BridgeConfig::new();

        config.db_password = "sofun".to_string().into();
        config.db_port = 45;
        config.db_user = "iam".to_string().into();
        config.db_host = "parties".to_string();

        assert_eq!(
            &Database::get_postgresql_url(&config),
            "postgresql://iam:sofun@parties:45"
        );
    }

    #[test]
    fn get_postgresql_database_url() {
        let mut config = BridgeConfig::new();

        config.db_name = "times".to_string();
        config.db_password = "funnier".to_string().into();
        config.db_port = 45;
        config.db_user = "butyouare".to_string().into();
        config.db_host = "parties".to_string();

        assert_eq!(
            &Database::get_postgresql_database_url(&config),
            "postgresql://butyouare:funnier@parties:45/times"
        );
    }
}