clementine_tx_sender/db/
mod.rs1use crate::config::TxSenderPostgresConfig;
4use clementine_errors::BridgeError;
5use secrecy::ExposeSecret;
6use sqlx::postgres::PgConnectOptions;
7use sqlx::ConnectOptions;
8use sqlx::{Pool, Postgres};
9use std::time::Duration;
10
11pub const DEFAULT_MAX_CONNECTIONS: u32 = 10;
12
13#[derive(Clone, Debug)]
15pub struct TxSenderDb {
16 pool: Pool<Postgres>,
17}
18
19pub type TxSenderTransaction = sqlx::Transaction<'static, Postgres>;
20pub type TxSenderDbTx<'a> = &'a mut TxSenderTransaction;
21
22#[cfg(feature = "citrea")]
23pub mod citrea;
24pub mod tx_sender;
25pub mod wrapper;
26
27#[macro_export]
29macro_rules! txsender_execute_query_with_tx {
30 ($pool:expr, $tx:expr, $query:expr, $method:ident) => {
31 match $tx {
32 Some(tx) => $query.$method(&mut **tx).await,
33 None => $query.$method($pool).await,
34 }
35 };
36}
37
38impl TxSenderDb {
39 pub fn from_pool(pool: Pool<Postgres>) -> Self {
40 Self { pool }
41 }
42
43 pub fn pool(&self) -> &Pool<Postgres> {
44 &self.pool
45 }
46
47 pub async fn connect(cfg: &TxSenderPostgresConfig) -> Result<Self, BridgeError> {
48 let mut opt = PgConnectOptions::default();
49 opt = opt.host(cfg.host.as_str());
50 opt = opt.port(cfg.port);
51 opt = opt.username(cfg.user.expose_secret());
52 opt = opt.password(cfg.password.expose_secret());
53 opt = opt.database(cfg.dbname.as_str());
54 opt = opt.log_slow_statements(log::LevelFilter::Debug, Duration::from_secs(3));
55
56 let pool = sqlx::postgres::PgPoolOptions::new()
57 .acquire_slow_level(log::LevelFilter::Debug)
58 .max_connections(DEFAULT_MAX_CONNECTIONS)
59 .connect_with(opt)
60 .await
61 .map_err(BridgeError::DatabaseError)?;
62
63 Ok(Self { pool })
64 }
65
66 pub async fn begin_transaction(&self) -> Result<TxSenderTransaction, BridgeError> {
67 Ok(self.pool.begin().await?)
68 }
69
70 pub async fn commit_transaction(&self, tx: TxSenderTransaction) -> Result<(), BridgeError> {
71 tx.commit().await.map_err(Into::into)
72 }
73
74 pub async fn run_migrations(&self) -> Result<(), BridgeError> {
76 static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!();
77 MIGRATOR
78 .run(&self.pool)
79 .await
80 .map_err(|e| BridgeError::Eyre(e.into()))?;
81 Ok(())
82 }
83}