clementine_core/database/
mod.rs1use std::time::Duration;
10
11use crate::config::BridgeConfig;
12use clementine_errors::BridgeError;
13use eyre::Context;
14use secrecy::ExposeSecret;
15use sqlx::migrate::Migrator;
16use sqlx::postgres::PgConnectOptions;
17use sqlx::ConnectOptions;
18use sqlx::{Pool, Postgres};
19
20mod aggregator;
21mod bitcoin_syncer;
22mod header_chain_prover;
23mod operator;
24#[cfg(feature = "automation")]
25mod state_machine;
26#[cfg(all(test, feature = "automation"))]
27mod test;
28#[cfg(feature = "automation")]
29mod tx_sender;
30mod verifier;
31mod wrapper;
32
33#[cfg(test)]
34pub use wrapper::*;
35
36pub const MAX_CONNECTIONS_FOR_POOL: u32 = 50;
39
40#[derive(Clone, Debug)]
42pub struct Database {
43 connection: Pool<Postgres>,
44}
45
46pub type DatabaseTransaction<'a> = &'a mut sqlx::Transaction<'static, Postgres>;
48
49#[macro_export]
58macro_rules! execute_query_with_tx {
59 ($conn:expr, $tx:expr, $query:expr, $method:ident) => {
60 match $tx {
61 Some(tx) => $query.$method(&mut **tx).await,
62 None => $query.$method(&$conn).await,
63 }
64 };
65}
66
67impl Database {
68 pub async fn new(config: &BridgeConfig) -> Result<Self, BridgeError> {
75 let mut opt = PgConnectOptions::default();
76 opt = opt.host(config.db_host.as_str());
77 opt = opt.port(
78 u16::try_from(config.db_port).wrap_err("Failed to convert database port to u16")?,
79 );
80 opt = opt.username(config.db_user.expose_secret());
81 opt = opt.password(config.db_password.expose_secret());
82 opt = opt.database(config.db_name.as_str());
83 opt = opt.log_slow_statements(log::LevelFilter::Debug, Duration::from_secs(3));
87
88 let opts = sqlx::postgres::PgPoolOptions::new()
89 .acquire_slow_level(log::LevelFilter::Debug)
90 .max_connections(MAX_CONNECTIONS_FOR_POOL);
91
92 #[cfg(test)]
93 let opts = if config.test_params.timeout_params.any_timeout() {
94 opts.acquire_timeout(Duration::from_secs(10000))
96 .acquire_slow_threshold(Duration::from_secs(10000))
97 } else {
98 opts
99 };
100
101 let connection = opts
102 .connect_with(opt)
103 .await
104 .map_err(BridgeError::DatabaseError)?;
105
106 Ok(Self { connection })
107 }
108
109 pub async fn close(&self) {
111 self.connection.close().await;
112 }
113
114 pub fn get_pool(&self) -> Pool<Postgres> {
115 self.connection.clone()
116 }
117
118 pub async fn is_pgmq_installed(
119 &self,
120 tx: Option<DatabaseTransaction<'_>>,
121 ) -> Result<bool, BridgeError> {
122 let query = sqlx::query_as::<_, (i64,)>(
123 "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'pgmq' AND table_name = 'meta'"
124 );
125
126 let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
127
128 Ok(result.0 > 0)
129 }
130
131 pub async fn run_schema_script(
138 config: &BridgeConfig,
139 is_verifier: bool,
140 ) -> Result<(), BridgeError> {
141 let database = Database::new(config).await?;
142
143 sqlx::raw_sql(include_str!("schema.sql"))
144 .execute(&database.connection)
145 .await?;
146 if is_verifier {
147 let is_pgmq_installed = database.is_pgmq_installed(None).await?;
149
150 if !is_pgmq_installed {
152 sqlx::raw_sql(include_str!("pgmq.sql"))
153 .execute(&database.connection)
154 .await?;
155 }
156 }
157
158 static MIGRATOR: Migrator = sqlx::migrate!("src/database/migrations");
162
163 MIGRATOR
164 .run(&database.connection)
165 .await
166 .wrap_err("Failed to run migrations")?;
167
168 database.close().await;
169 Ok(())
170 }
171
172 pub async fn begin_transaction(
177 &self,
178 ) -> Result<sqlx::Transaction<'static, sqlx::Postgres>, BridgeError> {
179 Ok(self.connection.begin().await?)
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use crate::test::common::*;
186 use crate::{config::BridgeConfig, database::Database};
187
188 #[tokio::test]
189 async fn valid_database_connection() {
190 let config = create_test_config_with_thread_name().await;
191
192 Database::new(&config).await.unwrap();
193 }
194
195 #[tokio::test]
196 #[should_panic]
197 async fn invalid_database_connection() {
198 let mut config = BridgeConfig::new();
199 config.db_host = "nonexistinghost".to_string();
200 config.db_name = "nonexistingpassword".to_string();
201 config.db_user = "nonexistinguser".to_string().into();
202 config.db_password = "nonexistingpassword".to_string().into();
203 config.db_port = 123;
204
205 Database::new(&config).await.unwrap();
206 }
207}