clementine_core/task/
aggregator_metric_publisher.rs

1use std::str::FromStr;
2use std::{collections::HashMap, time::Duration};
3
4use tonic::async_trait;
5
6use crate::{
7    aggregator::{Aggregator, EntityId, OperatorId, VerifierId},
8    metrics::EntityL1SyncStatusMetrics,
9    rpc::clementine::EntityType,
10    task::{Task, TaskVariant},
11};
12use clementine_errors::BridgeError;
13
14pub const AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY: Duration = Duration::from_secs(120);
15
16/// Publishes metrics for the aggregator, including the Entity Statuses of all registered entities.
17#[derive(Debug)]
18pub struct AggregatorMetricPublisher {
19    aggregator: Aggregator,
20    metrics: HashMap<EntityId, EntityL1SyncStatusMetrics>,
21}
22
23impl AggregatorMetricPublisher {
24    pub async fn new(aggregator: Aggregator) -> Result<Self, BridgeError> {
25        Ok(Self {
26            aggregator: Aggregator::new(aggregator.config).await?,
27            metrics: HashMap::new(),
28        })
29    }
30
31    /// Convert protobuf EntityId to rust EntityId
32    fn convert_entity_id(
33        proto_entity_id: &crate::rpc::clementine::EntityId,
34    ) -> Result<EntityId, BridgeError> {
35        let entity_type = EntityType::try_from(proto_entity_id.kind)
36            .map_err(|_| BridgeError::ConfigError("Invalid entity type".into()))?;
37
38        match entity_type {
39            EntityType::Operator => {
40                let xonly_pk =
41                    bitcoin::XOnlyPublicKey::from_str(&proto_entity_id.id).map_err(|e| {
42                        BridgeError::ConfigError(format!("Invalid operator xonly public key: {e}"))
43                    })?;
44                Ok(EntityId::Operator(OperatorId(xonly_pk)))
45            }
46            EntityType::Verifier => {
47                let pk =
48                    bitcoin::secp256k1::PublicKey::from_str(&proto_entity_id.id).map_err(|e| {
49                        BridgeError::ConfigError(format!("Invalid verifier public key: {e}"))
50                    })?;
51                Ok(EntityId::Verifier(VerifierId(pk)))
52            }
53            EntityType::Aggregator => Ok(EntityId::Aggregator),
54            EntityType::EntityUnknown => {
55                Err(BridgeError::ConfigError("Unknown entity type".into()))
56            }
57        }
58    }
59
60    /// Create or get metrics for an entity
61    fn get_or_create_metrics(&mut self, entity_id: EntityId) -> &mut EntityL1SyncStatusMetrics {
62        self.metrics.entry(entity_id).or_insert_with(|| {
63            let scope = format!("{entity_id}_l1_sync_status");
64            EntityL1SyncStatusMetrics::describe(&scope);
65            EntityL1SyncStatusMetrics::new(&scope)
66        })
67    }
68}
69
70#[async_trait]
71impl Task for AggregatorMetricPublisher {
72    const VARIANT: TaskVariant = TaskVariant::MetricPublisher;
73    type Output = bool;
74
75    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
76        // Metrics are not published in tests
77        if cfg!(test) {
78            return Ok(false);
79        }
80        tracing::info!("Publishing metrics for aggregator");
81
82        let entity_statuses = self
83            .aggregator
84            .get_entity_statuses(false)
85            .await
86            .inspect_err(|e| {
87                tracing::error!("Error getting entities status: {:?}", e);
88            })?;
89
90        tracing::info!("Entities status: {:?}", entity_statuses);
91
92        // Process each entity status
93        for entity_status_with_id in entity_statuses {
94            let proto_entity_id =
95                entity_status_with_id
96                    .entity_id
97                    .unwrap_or(crate::rpc::clementine::EntityId {
98                        kind: EntityType::EntityUnknown as i32,
99                        id: "Unknown entity".to_string(),
100                    });
101
102            let entity_id = match Self::convert_entity_id(&proto_entity_id) {
103                Ok(id) => id,
104                Err(e) => {
105                    tracing::error!("Failed to convert entity_id ({proto_entity_id:?}) to internal EntityId. This can be normal if the entity is unreachable: {e}");
106                    continue;
107                }
108            };
109
110            let metrics = self.get_or_create_metrics(entity_id);
111
112            match entity_status_with_id.status_result {
113                Some(crate::rpc::clementine::entity_status_with_id::StatusResult::Status(
114                    status,
115                )) => {
116                    // Parse wallet balance from string (format is "X.XXX BTC")
117                    if let Some(balance) = status
118                        .wallet_balance
119                        .and_then(|s| s.strip_suffix(" BTC").and_then(|s| s.parse::<f64>().ok()))
120                    {
121                        metrics.wallet_balance_btc.set(balance);
122                    }
123
124                    if let Some(height) = status.rpc_tip_height {
125                        metrics.rpc_tip_height.set(height as f64);
126                    }
127                    if let Some(height) = status.bitcoin_syncer_synced_height {
128                        metrics.btc_syncer_synced_height.set(height as f64);
129                    }
130                    if let Some(height) = status.hcp_last_proven_height {
131                        metrics.hcp_last_proven_height.set(height as f64);
132                    }
133                    if let Some(height) = status.tx_sender_synced_height {
134                        metrics.tx_sender_synced_height.set(height as f64);
135                    }
136                    if let Some(height) = status.finalized_synced_height {
137                        metrics.finalized_synced_height.set(height as f64);
138                    }
139                    if let Some(height) = status.state_manager_next_height {
140                        metrics.state_manager_next_height.set(height as f64);
141                    }
142                    if let Some(tasks) = status.stopped_tasks {
143                        metrics
144                            .stopped_tasks_count
145                            .set(tasks.stopped_tasks.len() as f64);
146                    }
147                    if let Some(fee_rate) = status.btc_fee_rate_sat_vb {
148                        metrics.bitcoin_fee_rate_sat_vb.set(fee_rate as f64);
149                    }
150                }
151                Some(crate::rpc::clementine::entity_status_with_id::StatusResult::Err(error)) => {
152                    tracing::error!("Entity {} error: {}", entity_id, error.error);
153                    // Increment error counter
154                    metrics.entity_status_error_count.increment(1);
155                }
156                None => {
157                    tracing::warn!("Entity {} has no status", entity_id);
158                }
159            }
160        }
161
162        // Always delay by returning false (ie. no work done)
163        Ok(false)
164    }
165}