use std::{sync::LazyLock, time::Duration};
use bitcoin::Amount;
use bitcoincore_rpc::RpcApi;
use eyre::Context;
use metrics::Gauge;
use tonic::async_trait;
use crate::{
database::Database,
errors::BridgeError,
extended_bitcoin_rpc::ExtendedBitcoinRpc,
utils::{timed_request_base, NamedEntity},
};
use metrics_derive::Metrics;
const L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT: Duration = Duration::from_secs(45);
#[derive(Metrics)]
#[metrics(scope = "l1_sync_status")]
pub struct L1SyncStatusMetrics {
#[metric(describe = "The current balance of the wallet in Bitcoin (BTC)")]
pub wallet_balance_btc: Gauge,
#[metric(describe = "The block height of the chain as seen by Bitcoin Core RPC")]
pub rpc_tip_height: Gauge,
#[metric(describe = "The block height of the Bitcoin Syncer")]
pub btc_syncer_synced_height: Gauge,
#[metric(describe = "The block height of the latest header chain proof")]
pub hcp_last_proven_height: Gauge,
#[metric(describe = "The block height processed by the Transaction Sender")]
pub tx_sender_synced_height: Gauge,
#[metric(describe = "The finalized block height as seen by the FinalizedBlockFetcher task")]
pub finalized_synced_height: Gauge,
#[metric(describe = "The next block height to process for the State Manager")]
pub state_manager_next_height: Gauge,
}
#[derive(Metrics)]
#[metrics(dynamic = true)]
pub struct EntityL1SyncStatusMetrics {
#[metric(describe = "The current balance of the wallet of the entity in Bitcoin (BTC)")]
pub wallet_balance_btc: Gauge,
#[metric(
describe = "The block height of the chain as seen by Bitcoin Core RPC for the entity"
)]
pub rpc_tip_height: Gauge,
#[metric(describe = "The block height of the Bitcoin Syncer for the entity")]
pub btc_syncer_synced_height: Gauge,
#[metric(describe = "The block height of the latest header chain proof for the entity")]
pub hcp_last_proven_height: Gauge,
#[metric(describe = "The block height processed by the Transaction Sender for the entity")]
pub tx_sender_synced_height: Gauge,
#[metric(
describe = "The finalized block height as seen by the FinalizedBlockFetcher task for the entity"
)]
pub finalized_synced_height: Gauge,
#[metric(describe = "The next block height to process for the State Manager for the entity")]
pub state_manager_next_height: Gauge,
#[metric(describe = "The number of error responses from the entity status endpoint")]
pub entity_status_error_count: metrics::Counter,
#[metric(describe = "The number of stopped tasks for the entity")]
pub stopped_tasks_count: Gauge,
}
pub static L1_SYNC_STATUS: LazyLock<L1SyncStatusMetrics> = LazyLock::new(|| {
L1SyncStatusMetrics::describe();
L1SyncStatusMetrics::default()
});
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct L1SyncStatus {
pub wallet_balance: Option<Amount>,
pub rpc_tip_height: Option<u32>,
pub btc_syncer_synced_height: Option<u32>,
pub hcp_last_proven_height: Option<u32>,
pub tx_sender_synced_height: Option<u32>,
pub finalized_synced_height: Option<u32>,
pub state_manager_next_height: Option<u32>,
}
pub async fn get_wallet_balance(rpc: &ExtendedBitcoinRpc) -> Result<Amount, BridgeError> {
let balance = rpc
.get_balance(None, None)
.await
.wrap_err("Failed to get wallet balance")?;
Ok(balance)
}
pub async fn get_rpc_tip_height(rpc: &ExtendedBitcoinRpc) -> Result<u32, BridgeError> {
let height = rpc.get_current_chain_height().await?;
Ok(height)
}
pub async fn get_btc_syncer_consumer_last_processed_block_height(
db: &Database,
consumer_handle: &str,
) -> Result<Option<u32>, BridgeError> {
db.get_last_processed_event_block_height(None, consumer_handle)
.await
}
pub async fn get_btc_syncer_synced_height(db: &Database) -> Result<Option<u32>, BridgeError> {
let height = db.get_max_height(None).await?;
Ok(height)
}
pub async fn get_hcp_last_proven_height(db: &Database) -> Result<Option<u32>, BridgeError> {
let latest_proven_block_height = db
.get_latest_proven_block_info(None)
.await?
.map(|(_, _, height)| height as u32);
Ok(latest_proven_block_height)
}
pub async fn get_state_manager_next_height(
db: &Database,
owner_type: &str,
) -> Result<Option<u32>, BridgeError> {
#[cfg(feature = "automation")]
{
let next_height = db
.get_next_height_to_process(None, owner_type)
.await?
.map(|x| x as u32);
Ok(next_height)
}
#[cfg(not(feature = "automation"))]
{
Ok(None)
}
}
#[async_trait]
pub trait L1SyncStatusProvider: NamedEntity {
async fn get_l1_status(
db: &Database,
rpc: &ExtendedBitcoinRpc,
) -> Result<L1SyncStatus, BridgeError>;
}
#[async_trait]
impl<T: NamedEntity + Sync + Send + 'static> L1SyncStatusProvider for T {
async fn get_l1_status(
db: &Database,
rpc: &ExtendedBitcoinRpc,
) -> Result<L1SyncStatus, BridgeError> {
let wallet_balance = timed_request_base(
L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
"get_wallet_balance",
get_wallet_balance(rpc),
)
.await
.ok()
.transpose()?;
let rpc_tip_height = timed_request_base(
L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
"get_rpc_tip_height",
get_rpc_tip_height(rpc),
)
.await
.ok()
.transpose()?;
let tx_sender_synced_height = timed_request_base(
L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
"get_tx_sender_synced_height",
get_btc_syncer_consumer_last_processed_block_height(db, T::TX_SENDER_CONSUMER_ID),
)
.await
.ok()
.transpose()?
.flatten();
#[cfg(feature = "automation")]
let finalized_synced_height = timed_request_base(
L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
"get_finalized_synced_height",
get_btc_syncer_consumer_last_processed_block_height(
db,
T::FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION,
),
)
.await
.ok()
.transpose()?
.flatten();
#[cfg(not(feature = "automation"))]
let finalized_synced_height = timed_request_base(
L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
"get_finalized_synced_height",
get_btc_syncer_consumer_last_processed_block_height(
db,
T::FINALIZED_BLOCK_CONSUMER_ID_NO_AUTOMATION,
),
)
.await
.ok()
.transpose()?
.flatten();
let btc_syncer_synced_height = timed_request_base(
L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
"get_btc_syncer_synced_height",
get_btc_syncer_synced_height(db),
)
.await
.ok()
.transpose()?
.flatten();
let hcp_last_proven_height = timed_request_base(
L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
"get_hcp_last_proven_height",
get_hcp_last_proven_height(db),
)
.await
.ok()
.transpose()?
.flatten();
let state_manager_next_height = timed_request_base(
L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
"get_state_manager_next_height",
get_state_manager_next_height(db, T::ENTITY_NAME),
)
.await
.ok()
.transpose()?
.flatten();
Ok(L1SyncStatus {
wallet_balance,
rpc_tip_height,
btc_syncer_synced_height,
hcp_last_proven_height,
tx_sender_synced_height,
finalized_synced_height,
state_manager_next_height,
})
}
}
#[cfg(test)]
mod tests {
#[cfg(not(feature = "automation"))]
use crate::rpc::clementine::EntityType;
use crate::{
rpc::clementine::GetEntityStatusesRequest,
test::common::{
citrea::MockCitreaClient, create_actors, create_regtest_rpc,
create_test_config_with_thread_name,
},
};
use std::time::Duration;
#[tokio::test]
async fn test_get_sync_status() {
let mut config = create_test_config_with_thread_name().await;
let _regtest = create_regtest_rpc(&mut config).await;
let actors = create_actors::<MockCitreaClient>(&config).await;
let mut aggregator = actors.get_aggregator();
tokio::time::sleep(Duration::from_secs(40)).await;
let entity_statuses = aggregator
.get_entity_statuses(tonic::Request::new(GetEntityStatusesRequest {
restart_tasks: false,
}))
.await
.unwrap()
.into_inner();
for entity in entity_statuses.entity_statuses {
let status = entity.status_result.unwrap();
match status {
crate::rpc::clementine::entity_status_with_id::StatusResult::Status(status) => {
tracing::info!("Status: {:#?}", status);
#[cfg(feature = "automation")]
{
assert!(status.automation);
assert!(
status
.tx_sender_synced_height
.expect("tx_sender_synced_height is None")
> 0
);
assert!(
status
.finalized_synced_height
.expect("finalized_synced_height is None")
> 0
);
assert!(
status
.hcp_last_proven_height
.expect("hcp_last_proven_height is None")
> 0
);
assert!(status.rpc_tip_height.expect("rpc_tip_height is None") > 0);
assert!(
status
.bitcoin_syncer_synced_height
.expect("bitcoin_syncer_synced_height is None")
> 0
);
assert!(
status
.state_manager_next_height
.expect("state_manager_next_height is None")
> 0
);
}
#[cfg(not(feature = "automation"))]
{
let entity_type: EntityType =
entity.entity_id.unwrap().kind.try_into().unwrap();
assert!(!status.automation);
assert!(status.tx_sender_synced_height.is_none());
if entity_type == EntityType::Verifier {
assert!(
status
.finalized_synced_height
.expect("finalized_synced_height is None")
> 0
);
} else {
assert!(status.finalized_synced_height.is_none());
}
assert!(status.hcp_last_proven_height.is_none());
assert!(status.rpc_tip_height.expect("rpc_tip_height is None") > 0);
assert!(
status
.bitcoin_syncer_synced_height
.expect("bitcoin_syncer_synced_height is None")
> 0
);
assert!(status.state_manager_next_height.is_none());
}
}
crate::rpc::clementine::entity_status_with_id::StatusResult::Err(error) => {
panic!("Couldn't get entity status: {}", error.error);
}
}
}
}
}