clementine_core/task/
aggregator_metric_publisher.rs

1use std::str::FromStr;
2use std::{collections::HashMap, time::Duration};
3
4use tonic::async_trait;
5
6use crate::{
7    aggregator::{Aggregator, EntityId, OperatorId, VerifierId},
8    errors::BridgeError,
9    metrics::EntityL1SyncStatusMetrics,
10    rpc::clementine::EntityType,
11    task::{Task, TaskVariant},
12};
13
14pub const AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY: Duration = Duration::from_secs(120);
15
16/// Publishes metrics for the aggregator, including the Entity Statuses of all registered entities.
17#[derive(Debug)]
18pub struct AggregatorMetricPublisher {
19    aggregator: Aggregator,
20    metrics: HashMap<EntityId, EntityL1SyncStatusMetrics>,
21}
22
23impl AggregatorMetricPublisher {
24    pub async fn new(aggregator: Aggregator) -> Result<Self, BridgeError> {
25        Ok(Self {
26            aggregator: Aggregator::new(aggregator.config).await?,
27            metrics: HashMap::new(),
28        })
29    }
30
31    /// Convert protobuf EntityId to rust EntityId
32    fn convert_entity_id(
33        proto_entity_id: &crate::rpc::clementine::EntityId,
34    ) -> Result<EntityId, BridgeError> {
35        let entity_type = EntityType::try_from(proto_entity_id.kind)
36            .map_err(|_| BridgeError::ConfigError("Invalid entity type".into()))?;
37
38        match entity_type {
39            EntityType::Operator => {
40                let xonly_pk =
41                    bitcoin::XOnlyPublicKey::from_str(&proto_entity_id.id).map_err(|e| {
42                        BridgeError::ConfigError(format!("Invalid operator xonly public key: {e}"))
43                    })?;
44                Ok(EntityId::Operator(OperatorId(xonly_pk)))
45            }
46            EntityType::Verifier => {
47                let pk =
48                    bitcoin::secp256k1::PublicKey::from_str(&proto_entity_id.id).map_err(|e| {
49                        BridgeError::ConfigError(format!("Invalid verifier public key: {e}"))
50                    })?;
51                Ok(EntityId::Verifier(VerifierId(pk)))
52            }
53            EntityType::Aggregator => Ok(EntityId::Aggregator),
54            EntityType::EntityUnknown => {
55                Err(BridgeError::ConfigError("Unknown entity type".into()))
56            }
57        }
58    }
59
60    /// Create or get metrics for an entity
61    fn get_or_create_metrics(&mut self, entity_id: EntityId) -> &mut EntityL1SyncStatusMetrics {
62        self.metrics.entry(entity_id).or_insert_with(|| {
63            let scope = format!("{entity_id}_l1_sync_status");
64            EntityL1SyncStatusMetrics::describe(&scope);
65            EntityL1SyncStatusMetrics::new(&scope)
66        })
67    }
68}
69
70#[async_trait]
71impl Task for AggregatorMetricPublisher {
72    const VARIANT: TaskVariant = TaskVariant::MetricPublisher;
73    type Output = bool;
74
75    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
76        // Metrics are not published in tests
77        if cfg!(test) {
78            return Ok(false);
79        }
80        tracing::info!("Publishing metrics for aggregator");
81
82        let entity_statuses = self
83            .aggregator
84            .get_entity_statuses(false)
85            .await
86            .inspect_err(|e| {
87                tracing::error!("Error getting entities status: {:?}", e);
88            })?;
89
90        tracing::info!("Entities status: {:?}", entity_statuses);
91
92        // Process each entity status
93        for entity_status_with_id in entity_statuses {
94            let proto_entity_id = entity_status_with_id
95                .entity_id
96                .ok_or_else(|| BridgeError::ConfigError("Missing entity_id".into()))?;
97
98            let entity_id = match Self::convert_entity_id(&proto_entity_id) {
99                Ok(id) => id,
100                Err(e) => {
101                    tracing::error!("Failed to convert entity_id: {}", e);
102                    continue;
103                }
104            };
105
106            let metrics = self.get_or_create_metrics(entity_id);
107
108            match entity_status_with_id.status_result {
109                Some(crate::rpc::clementine::entity_status_with_id::StatusResult::Status(
110                    status,
111                )) => {
112                    // Parse wallet balance from string (format is "X.XXX BTC")
113                    if let Some(balance) = status
114                        .wallet_balance
115                        .and_then(|s| s.strip_suffix(" BTC").and_then(|s| s.parse::<f64>().ok()))
116                    {
117                        metrics.wallet_balance_btc.set(balance);
118                    }
119
120                    if let Some(height) = status.rpc_tip_height {
121                        metrics.rpc_tip_height.set(height as f64);
122                    }
123                    if let Some(height) = status.bitcoin_syncer_synced_height {
124                        metrics.btc_syncer_synced_height.set(height as f64);
125                    }
126                    if let Some(height) = status.hcp_last_proven_height {
127                        metrics.hcp_last_proven_height.set(height as f64);
128                    }
129                    if let Some(height) = status.tx_sender_synced_height {
130                        metrics.tx_sender_synced_height.set(height as f64);
131                    }
132                    if let Some(height) = status.finalized_synced_height {
133                        metrics.finalized_synced_height.set(height as f64);
134                    }
135                    if let Some(height) = status.state_manager_next_height {
136                        metrics.state_manager_next_height.set(height as f64);
137                    }
138                    if let Some(tasks) = status.stopped_tasks {
139                        metrics
140                            .stopped_tasks_count
141                            .set(tasks.stopped_tasks.len() as f64);
142                    }
143                    if let Some(fee_rate) = status.btc_fee_rate_sat_vb {
144                        metrics.bitcoin_fee_rate_sat_vb.set(fee_rate as f64);
145                    }
146                }
147                Some(crate::rpc::clementine::entity_status_with_id::StatusResult::Err(error)) => {
148                    tracing::error!("Entity {} error: {}", entity_id, error.error);
149                    // Increment error counter
150                    metrics.entity_status_error_count.increment(1);
151                }
152                None => {
153                    tracing::warn!("Entity {} has no status", entity_id);
154                }
155            }
156        }
157
158        // Always delay by returning false (ie. no work done)
159        Ok(false)
160    }
161}