clementine_core/task/
aggregator_metric_publisher.rs1use std::str::FromStr;
2use std::{collections::HashMap, time::Duration};
3
4use tonic::async_trait;
5
6use crate::{
7 aggregator::{Aggregator, EntityId, OperatorId, VerifierId},
8 errors::BridgeError,
9 metrics::EntityL1SyncStatusMetrics,
10 rpc::clementine::EntityType,
11 task::{Task, TaskVariant},
12};
13
14pub const AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY: Duration = Duration::from_secs(120);
15
16#[derive(Debug)]
18pub struct AggregatorMetricPublisher {
19 aggregator: Aggregator,
20 metrics: HashMap<EntityId, EntityL1SyncStatusMetrics>,
21}
22
23impl AggregatorMetricPublisher {
24 pub async fn new(aggregator: Aggregator) -> Result<Self, BridgeError> {
25 Ok(Self {
26 aggregator: Aggregator::new(aggregator.config).await?,
27 metrics: HashMap::new(),
28 })
29 }
30
31 fn convert_entity_id(
33 proto_entity_id: &crate::rpc::clementine::EntityId,
34 ) -> Result<EntityId, BridgeError> {
35 let entity_type = EntityType::try_from(proto_entity_id.kind)
36 .map_err(|_| BridgeError::ConfigError("Invalid entity type".into()))?;
37
38 match entity_type {
39 EntityType::Operator => {
40 let xonly_pk =
41 bitcoin::XOnlyPublicKey::from_str(&proto_entity_id.id).map_err(|e| {
42 BridgeError::ConfigError(format!("Invalid operator xonly public key: {e}"))
43 })?;
44 Ok(EntityId::Operator(OperatorId(xonly_pk)))
45 }
46 EntityType::Verifier => {
47 let pk =
48 bitcoin::secp256k1::PublicKey::from_str(&proto_entity_id.id).map_err(|e| {
49 BridgeError::ConfigError(format!("Invalid verifier public key: {e}"))
50 })?;
51 Ok(EntityId::Verifier(VerifierId(pk)))
52 }
53 EntityType::Aggregator => Ok(EntityId::Aggregator),
54 EntityType::EntityUnknown => {
55 Err(BridgeError::ConfigError("Unknown entity type".into()))
56 }
57 }
58 }
59
60 fn get_or_create_metrics(&mut self, entity_id: EntityId) -> &mut EntityL1SyncStatusMetrics {
62 self.metrics.entry(entity_id).or_insert_with(|| {
63 let scope = format!("{entity_id}_l1_sync_status");
64 EntityL1SyncStatusMetrics::describe(&scope);
65 EntityL1SyncStatusMetrics::new(&scope)
66 })
67 }
68}
69
70#[async_trait]
71impl Task for AggregatorMetricPublisher {
72 const VARIANT: TaskVariant = TaskVariant::MetricPublisher;
73 type Output = bool;
74
75 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
76 if cfg!(test) {
78 return Ok(false);
79 }
80 tracing::info!("Publishing metrics for aggregator");
81
82 let entity_statuses = self
83 .aggregator
84 .get_entity_statuses(false)
85 .await
86 .inspect_err(|e| {
87 tracing::error!("Error getting entities status: {:?}", e);
88 })?;
89
90 tracing::info!("Entities status: {:?}", entity_statuses);
91
92 for entity_status_with_id in entity_statuses {
94 let proto_entity_id = entity_status_with_id
95 .entity_id
96 .ok_or_else(|| BridgeError::ConfigError("Missing entity_id".into()))?;
97
98 let entity_id = match Self::convert_entity_id(&proto_entity_id) {
99 Ok(id) => id,
100 Err(e) => {
101 tracing::error!("Failed to convert entity_id: {}", e);
102 continue;
103 }
104 };
105
106 let metrics = self.get_or_create_metrics(entity_id);
107
108 match entity_status_with_id.status_result {
109 Some(crate::rpc::clementine::entity_status_with_id::StatusResult::Status(
110 status,
111 )) => {
112 if let Some(balance) = status
114 .wallet_balance
115 .and_then(|s| s.strip_suffix(" BTC").and_then(|s| s.parse::<f64>().ok()))
116 {
117 metrics.wallet_balance_btc.set(balance);
118 }
119
120 if let Some(height) = status.rpc_tip_height {
121 metrics.rpc_tip_height.set(height as f64);
122 }
123 if let Some(height) = status.bitcoin_syncer_synced_height {
124 metrics.btc_syncer_synced_height.set(height as f64);
125 }
126 if let Some(height) = status.hcp_last_proven_height {
127 metrics.hcp_last_proven_height.set(height as f64);
128 }
129 if let Some(height) = status.tx_sender_synced_height {
130 metrics.tx_sender_synced_height.set(height as f64);
131 }
132 if let Some(height) = status.finalized_synced_height {
133 metrics.finalized_synced_height.set(height as f64);
134 }
135 if let Some(height) = status.state_manager_next_height {
136 metrics.state_manager_next_height.set(height as f64);
137 }
138 if let Some(tasks) = status.stopped_tasks {
139 metrics
140 .stopped_tasks_count
141 .set(tasks.stopped_tasks.len() as f64);
142 }
143 if let Some(fee_rate) = status.btc_fee_rate_sat_vb {
144 metrics.bitcoin_fee_rate_sat_vb.set(fee_rate as f64);
145 }
146 }
147 Some(crate::rpc::clementine::entity_status_with_id::StatusResult::Err(error)) => {
148 tracing::error!("Entity {} error: {}", entity_id, error.error);
149 metrics.entity_status_error_count.increment(1);
151 }
152 None => {
153 tracing::warn!("Entity {} has no status", entity_id);
154 }
155 }
156 }
157
158 Ok(false)
160 }
161}