clementine_core/task/
aggregator_metric_publisher.rs1use std::str::FromStr;
2use std::{collections::HashMap, time::Duration};
3
4use tonic::async_trait;
5
6use crate::{
7 aggregator::{Aggregator, EntityId, OperatorId, VerifierId},
8 metrics::EntityL1SyncStatusMetrics,
9 rpc::clementine::EntityType,
10 task::{Task, TaskVariant},
11};
12use clementine_errors::BridgeError;
13
14pub const AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY: Duration = Duration::from_secs(120);
15
16#[derive(Debug)]
18pub struct AggregatorMetricPublisher {
19 aggregator: Aggregator,
20 metrics: HashMap<EntityId, EntityL1SyncStatusMetrics>,
21}
22
23impl AggregatorMetricPublisher {
24 pub async fn new(aggregator: Aggregator) -> Result<Self, BridgeError> {
25 Ok(Self {
26 aggregator: Aggregator::new(aggregator.config).await?,
27 metrics: HashMap::new(),
28 })
29 }
30
31 fn convert_entity_id(
33 proto_entity_id: &crate::rpc::clementine::EntityId,
34 ) -> Result<EntityId, BridgeError> {
35 let entity_type = EntityType::try_from(proto_entity_id.kind)
36 .map_err(|_| BridgeError::ConfigError("Invalid entity type".into()))?;
37
38 match entity_type {
39 EntityType::Operator => {
40 let xonly_pk =
41 bitcoin::XOnlyPublicKey::from_str(&proto_entity_id.id).map_err(|e| {
42 BridgeError::ConfigError(format!("Invalid operator xonly public key: {e}"))
43 })?;
44 Ok(EntityId::Operator(OperatorId(xonly_pk)))
45 }
46 EntityType::Verifier => {
47 let pk =
48 bitcoin::secp256k1::PublicKey::from_str(&proto_entity_id.id).map_err(|e| {
49 BridgeError::ConfigError(format!("Invalid verifier public key: {e}"))
50 })?;
51 Ok(EntityId::Verifier(VerifierId(pk)))
52 }
53 EntityType::Aggregator => Ok(EntityId::Aggregator),
54 EntityType::EntityUnknown => {
55 Err(BridgeError::ConfigError("Unknown entity type".into()))
56 }
57 }
58 }
59
60 fn get_or_create_metrics(&mut self, entity_id: EntityId) -> &mut EntityL1SyncStatusMetrics {
62 self.metrics.entry(entity_id).or_insert_with(|| {
63 let scope = format!("{entity_id}_l1_sync_status");
64 EntityL1SyncStatusMetrics::describe(&scope);
65 EntityL1SyncStatusMetrics::new(&scope)
66 })
67 }
68}
69
70#[async_trait]
71impl Task for AggregatorMetricPublisher {
72 const VARIANT: TaskVariant = TaskVariant::MetricPublisher;
73 type Output = bool;
74
75 async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
76 if cfg!(test) {
78 return Ok(false);
79 }
80 tracing::info!("Publishing metrics for aggregator");
81
82 let entity_statuses = self
83 .aggregator
84 .get_entity_statuses(false)
85 .await
86 .inspect_err(|e| {
87 tracing::error!("Error getting entities status: {:?}", e);
88 })?;
89
90 tracing::info!("Entities status: {:?}", entity_statuses);
91
92 for entity_status_with_id in entity_statuses {
94 let proto_entity_id =
95 entity_status_with_id
96 .entity_id
97 .unwrap_or(crate::rpc::clementine::EntityId {
98 kind: EntityType::EntityUnknown as i32,
99 id: "Unknown entity".to_string(),
100 });
101
102 let entity_id = match Self::convert_entity_id(&proto_entity_id) {
103 Ok(id) => id,
104 Err(e) => {
105 tracing::error!("Failed to convert entity_id ({proto_entity_id:?}) to internal EntityId. This can be normal if the entity is unreachable: {e}");
106 continue;
107 }
108 };
109
110 let metrics = self.get_or_create_metrics(entity_id);
111
112 match entity_status_with_id.status_result {
113 Some(crate::rpc::clementine::entity_status_with_id::StatusResult::Status(
114 status,
115 )) => {
116 if let Some(balance) = status
118 .wallet_balance
119 .and_then(|s| s.strip_suffix(" BTC").and_then(|s| s.parse::<f64>().ok()))
120 {
121 metrics.wallet_balance_btc.set(balance);
122 }
123
124 if let Some(height) = status.rpc_tip_height {
125 metrics.rpc_tip_height.set(height as f64);
126 }
127 if let Some(height) = status.bitcoin_syncer_synced_height {
128 metrics.btc_syncer_synced_height.set(height as f64);
129 }
130 if let Some(height) = status.hcp_last_proven_height {
131 metrics.hcp_last_proven_height.set(height as f64);
132 }
133 if let Some(height) = status.tx_sender_synced_height {
134 metrics.tx_sender_synced_height.set(height as f64);
135 }
136 if let Some(height) = status.finalized_synced_height {
137 metrics.finalized_synced_height.set(height as f64);
138 }
139 if let Some(height) = status.state_manager_next_height {
140 metrics.state_manager_next_height.set(height as f64);
141 }
142 if let Some(tasks) = status.stopped_tasks {
143 metrics
144 .stopped_tasks_count
145 .set(tasks.stopped_tasks.len() as f64);
146 }
147 if let Some(fee_rate) = status.btc_fee_rate_sat_vb {
148 metrics.bitcoin_fee_rate_sat_vb.set(fee_rate as f64);
149 }
150 }
151 Some(crate::rpc::clementine::entity_status_with_id::StatusResult::Err(error)) => {
152 tracing::error!("Entity {} error: {}", entity_id, error.error);
153 metrics.entity_status_error_count.increment(1);
155 }
156 None => {
157 tracing::warn!("Entity {} has no status", entity_id);
158 }
159 }
160 }
161
162 Ok(false)
164 }
165}