clementine_core/database/
mod.rs1use std::time::Duration;
10
11use crate::config::BridgeConfig;
12use clementine_errors::BridgeError;
13use eyre::Context;
14use secrecy::ExposeSecret;
15use sqlx::migrate::Migrator;
16use sqlx::postgres::PgConnectOptions;
17use sqlx::ConnectOptions;
18use sqlx::{Pool, Postgres};
19
20mod aggregator;
21mod bitcoin_syncer;
22mod header_chain_prover;
23mod operator;
24#[cfg(feature = "automation")]
25mod state_machine;
26#[cfg(all(test, feature = "automation"))]
27mod test;
28mod verifier;
29mod wrapper;
30
31#[cfg(test)]
32pub use wrapper::*;
33
34pub const MAX_CONNECTIONS_FOR_POOL: u32 = 50;
37
38#[derive(Clone, Debug)]
40pub struct Database {
41 connection: Pool<Postgres>,
42}
43
44pub type DatabaseTransaction<'a> = &'a mut sqlx::Transaction<'static, Postgres>;
46
47#[macro_export]
56macro_rules! execute_query_with_tx {
57 ($conn:expr, $tx:expr, $query:expr, $method:ident) => {
58 match $tx {
59 Some(tx) => $query.$method(&mut **tx).await,
60 None => $query.$method(&$conn).await,
61 }
62 };
63}
64
65impl Database {
66 pub async fn new(config: &BridgeConfig) -> Result<Self, BridgeError> {
73 let mut opt = PgConnectOptions::default();
74 opt = opt.host(config.db_host.as_str());
75 opt = opt.port(config.db_port);
76 opt = opt.username(config.db_user.expose_secret());
77 opt = opt.password(config.db_password.expose_secret());
78 opt = opt.database(config.db_name.as_str());
79 opt = opt.log_slow_statements(log::LevelFilter::Debug, Duration::from_secs(3));
83
84 let opts = sqlx::postgres::PgPoolOptions::new()
85 .acquire_slow_level(log::LevelFilter::Debug)
86 .max_connections(MAX_CONNECTIONS_FOR_POOL);
87
88 #[cfg(test)]
89 let opts = if config.test_params.timeout_params.any_timeout() {
90 opts.acquire_timeout(Duration::from_secs(10000))
92 .acquire_slow_threshold(Duration::from_secs(10000))
93 } else {
94 opts
95 };
96
97 let connection = opts
98 .connect_with(opt)
99 .await
100 .map_err(BridgeError::DatabaseError)?;
101
102 Ok(Self { connection })
103 }
104
105 pub async fn close(&self) {
107 self.connection.close().await;
108 }
109
110 pub fn get_pool(&self) -> Pool<Postgres> {
111 self.connection.clone()
112 }
113
114 pub async fn is_pgmq_installed(
115 &self,
116 tx: Option<DatabaseTransaction<'_>>,
117 ) -> Result<bool, BridgeError> {
118 let query = sqlx::query_as::<_, (i64,)>(
119 "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'pgmq' AND table_name = 'meta'"
120 );
121
122 let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
123
124 Ok(result.0 > 0)
125 }
126
127 pub async fn run_schema_script(
134 config: &BridgeConfig,
135 is_verifier: bool,
136 ) -> Result<(), BridgeError> {
137 let database = Database::new(config).await?;
138
139 sqlx::raw_sql(include_str!("schema.sql"))
140 .execute(&database.connection)
141 .await?;
142 if is_verifier {
143 let is_pgmq_installed = database.is_pgmq_installed(None).await?;
145
146 if !is_pgmq_installed {
148 sqlx::raw_sql(include_str!("pgmq.sql"))
149 .execute(&database.connection)
150 .await?;
151 }
152 }
153
154 static MIGRATOR: Migrator = sqlx::migrate!("src/database/migrations");
158
159 MIGRATOR
160 .run(&database.connection)
161 .await
162 .wrap_err("Failed to run migrations")?;
163
164 database.close().await;
165 Ok(())
166 }
167
168 pub async fn begin_transaction(
173 &self,
174 ) -> Result<sqlx::Transaction<'static, sqlx::Postgres>, BridgeError> {
175 Ok(self.connection.begin().await?)
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use crate::test::common::*;
182 use crate::{config::BridgeConfig, database::Database};
183
184 #[tokio::test]
185 async fn valid_database_connection() {
186 let config = create_test_config_with_thread_name().await;
187
188 Database::new(&config).await.unwrap();
189 }
190
191 #[tokio::test]
192 #[should_panic]
193 async fn invalid_database_connection() {
194 let mut config = BridgeConfig::new();
195 config.db_host = "nonexistinghost".to_string();
196 config.db_name = "nonexistingpassword".to_string();
197 config.db_user = "nonexistinguser".to_string().into();
198 config.db_password = "nonexistingpassword".to_string().into();
199 config.db_port = 123;
200
201 Database::new(&config).await.unwrap();
202 }
203}