clementine_tx_sender/db/
mod.rs

1//! Postgres database layer for tx-sender.
2
3use crate::config::TxSenderPostgresConfig;
4use clementine_errors::BridgeError;
5use secrecy::ExposeSecret;
6use sqlx::postgres::PgConnectOptions;
7use sqlx::ConnectOptions;
8use sqlx::{Pool, Postgres};
9use std::time::Duration;
10
11pub const DEFAULT_MAX_CONNECTIONS: u32 = 10;
12
13/// A thin Postgres wrapper dedicated to tx-sender tables.
14#[derive(Clone, Debug)]
15pub struct TxSenderDb {
16    pool: Pool<Postgres>,
17}
18
19pub type TxSenderTransaction = sqlx::Transaction<'static, Postgres>;
20pub type TxSenderDbTx<'a> = &'a mut TxSenderTransaction;
21
22#[cfg(feature = "citrea")]
23pub mod citrea;
24pub mod tx_sender;
25pub mod wrapper;
26
27/// Executes a query with a transaction if it is provided.
28#[macro_export]
29macro_rules! txsender_execute_query_with_tx {
30    ($pool:expr, $tx:expr, $query:expr, $method:ident) => {
31        match $tx {
32            Some(tx) => $query.$method(&mut **tx).await,
33            None => $query.$method($pool).await,
34        }
35    };
36}
37
38impl TxSenderDb {
39    pub fn from_pool(pool: Pool<Postgres>) -> Self {
40        Self { pool }
41    }
42
43    pub fn pool(&self) -> &Pool<Postgres> {
44        &self.pool
45    }
46
47    pub async fn connect(cfg: &TxSenderPostgresConfig) -> Result<Self, BridgeError> {
48        let mut opt = PgConnectOptions::default();
49        opt = opt.host(cfg.host.as_str());
50        opt = opt.port(cfg.port);
51        opt = opt.username(cfg.user.expose_secret());
52        opt = opt.password(cfg.password.expose_secret());
53        opt = opt.database(cfg.dbname.as_str());
54        opt = opt.log_slow_statements(log::LevelFilter::Debug, Duration::from_secs(3));
55
56        let pool = sqlx::postgres::PgPoolOptions::new()
57            .acquire_slow_level(log::LevelFilter::Debug)
58            .max_connections(DEFAULT_MAX_CONNECTIONS)
59            .connect_with(opt)
60            .await
61            .map_err(BridgeError::DatabaseError)?;
62
63        Ok(Self { pool })
64    }
65
66    pub async fn begin_transaction(&self) -> Result<TxSenderTransaction, BridgeError> {
67        Ok(self.pool.begin().await?)
68    }
69
70    pub async fn commit_transaction(&self, tx: TxSenderTransaction) -> Result<(), BridgeError> {
71        tx.commit().await.map_err(Into::into)
72    }
73
74    /// Runs tx-sender schema initialization/migrations.
75    pub async fn run_migrations(&self) -> Result<(), BridgeError> {
76        static MIGRATOR: sqlx::migrate::Migrator = sqlx::migrate!();
77        MIGRATOR
78            .run(&self.pool)
79            .await
80            .map_err(|e| BridgeError::Eyre(e.into()))?;
81        Ok(())
82    }
83}