clementine_core/database/
mod.rs1use std::time::Duration;
10
11use crate::{config::BridgeConfig, errors::BridgeError};
12use eyre::Context;
13use secrecy::ExposeSecret;
14use sqlx::migrate::Migrator;
15use sqlx::postgres::PgConnectOptions;
16use sqlx::ConnectOptions;
17use sqlx::{Pool, Postgres};
18
19mod aggregator;
20mod bitcoin_syncer;
21mod header_chain_prover;
22mod operator;
23#[cfg(feature = "automation")]
24mod state_machine;
25#[cfg(all(test, feature = "automation"))]
26mod test;
27#[cfg(feature = "automation")]
28mod tx_sender;
29mod verifier;
30mod wrapper;
31
32#[cfg(test)]
33pub use wrapper::*;
34
35pub const MAX_CONNECTIONS_FOR_POOL: u32 = 50;
38
39#[derive(Clone, Debug)]
41pub struct Database {
42 connection: Pool<Postgres>,
43}
44
45pub type DatabaseTransaction<'a, 'b> = &'a mut sqlx::Transaction<'b, Postgres>;
47
48#[macro_export]
57macro_rules! execute_query_with_tx {
58 ($conn:expr, $tx:expr, $query:expr, $method:ident) => {
59 match $tx {
60 Some(tx) => $query.$method(&mut **tx).await,
61 None => $query.$method(&$conn).await,
62 }
63 };
64}
65
66impl Database {
67 pub async fn new(config: &BridgeConfig) -> Result<Self, BridgeError> {
74 let mut opt = PgConnectOptions::default();
75 opt = opt.host(config.db_host.as_str());
76 opt = opt.port(
77 u16::try_from(config.db_port).wrap_err("Failed to convert database port to u16")?,
78 );
79 opt = opt.username(config.db_user.expose_secret());
80 opt = opt.password(config.db_password.expose_secret());
81 opt = opt.database(config.db_name.as_str());
82 opt = opt.log_slow_statements(log::LevelFilter::Debug, Duration::from_secs(3));
86
87 let opts = sqlx::postgres::PgPoolOptions::new()
88 .acquire_slow_level(log::LevelFilter::Debug)
89 .max_connections(MAX_CONNECTIONS_FOR_POOL);
90
91 #[cfg(test)]
92 let opts = if config.test_params.timeout_params.any_timeout() {
93 opts.acquire_timeout(Duration::from_secs(10000))
95 .acquire_slow_threshold(Duration::from_secs(10000))
96 } else {
97 opts
98 };
99
100 let connection = opts
101 .connect_with(opt)
102 .await
103 .map_err(BridgeError::DatabaseError)?;
104
105 Ok(Self { connection })
106 }
107
108 pub async fn close(&self) {
110 self.connection.close().await;
111 }
112
113 pub fn get_pool(&self) -> Pool<Postgres> {
114 self.connection.clone()
115 }
116
117 pub async fn is_pgmq_installed(
118 &self,
119 tx: Option<DatabaseTransaction<'_, '_>>,
120 ) -> Result<bool, BridgeError> {
121 let query = sqlx::query_as::<_, (i64,)>(
122 "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'pgmq' AND table_name = 'meta'"
123 );
124
125 let result = execute_query_with_tx!(self.connection, tx, query, fetch_one)?;
126
127 Ok(result.0 > 0)
128 }
129
130 pub async fn run_schema_script(
137 config: &BridgeConfig,
138 is_verifier: bool,
139 ) -> Result<(), BridgeError> {
140 let database = Database::new(config).await?;
141
142 sqlx::raw_sql(include_str!("schema.sql"))
143 .execute(&database.connection)
144 .await?;
145 if is_verifier {
146 let is_pgmq_installed = database.is_pgmq_installed(None).await?;
148
149 if !is_pgmq_installed {
151 sqlx::raw_sql(include_str!("pgmq.sql"))
152 .execute(&database.connection)
153 .await?;
154 }
155 }
156
157 static MIGRATOR: Migrator = sqlx::migrate!("src/database/migrations");
161
162 MIGRATOR
163 .run(&database.connection)
164 .await
165 .wrap_err("Failed to run migrations")?;
166
167 database.close().await;
168 Ok(())
169 }
170
171 pub async fn begin_transaction(
176 &self,
177 ) -> Result<sqlx::Transaction<'static, sqlx::Postgres>, BridgeError> {
178 Ok(self.connection.begin().await?)
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use crate::test::common::*;
185 use crate::{config::BridgeConfig, database::Database};
186
187 #[tokio::test]
188 async fn valid_database_connection() {
189 let config = create_test_config_with_thread_name().await;
190
191 Database::new(&config).await.unwrap();
192 }
193
194 #[tokio::test]
195 #[should_panic]
196 async fn invalid_database_connection() {
197 let mut config = BridgeConfig::new();
198 config.db_host = "nonexistinghost".to_string();
199 config.db_name = "nonexistingpassword".to_string();
200 config.db_user = "nonexistinguser".to_string().into();
201 config.db_password = "nonexistingpassword".to_string().into();
202 config.db_port = 123;
203
204 Database::new(&config).await.unwrap();
205 }
206}