clementine_core/task/
aggregator_metric_publisher.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
use std::str::FromStr;
use std::{collections::HashMap, time::Duration};

use tonic::async_trait;

use crate::{
    aggregator::{Aggregator, EntityId, OperatorId, VerifierId},
    errors::BridgeError,
    metrics::EntityL1SyncStatusMetrics,
    rpc::clementine::EntityType,
    task::{Task, TaskVariant},
};

pub const AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY: Duration = Duration::from_secs(120);

/// Publishes metrics for the aggregator, including the Entity Statuses of all registered entities.
#[derive(Debug)]
pub struct AggregatorMetricPublisher {
    aggregator: Aggregator,
    metrics: HashMap<EntityId, EntityL1SyncStatusMetrics>,
}

impl AggregatorMetricPublisher {
    pub async fn new(aggregator: Aggregator) -> Result<Self, BridgeError> {
        Ok(Self {
            aggregator: Aggregator::new(aggregator.config).await?,
            metrics: HashMap::new(),
        })
    }

    /// Convert protobuf EntityId to rust EntityId
    fn convert_entity_id(
        proto_entity_id: &crate::rpc::clementine::EntityId,
    ) -> Result<EntityId, BridgeError> {
        let entity_type = EntityType::try_from(proto_entity_id.kind)
            .map_err(|_| BridgeError::ConfigError("Invalid entity type".into()))?;

        match entity_type {
            EntityType::Operator => {
                let xonly_pk =
                    bitcoin::XOnlyPublicKey::from_str(&proto_entity_id.id).map_err(|e| {
                        BridgeError::ConfigError(format!(
                            "Invalid operator xonly public key: {}",
                            e
                        ))
                    })?;
                Ok(EntityId::Operator(OperatorId(xonly_pk)))
            }
            EntityType::Verifier => {
                let pk =
                    bitcoin::secp256k1::PublicKey::from_str(&proto_entity_id.id).map_err(|e| {
                        BridgeError::ConfigError(format!("Invalid verifier public key: {}", e))
                    })?;
                Ok(EntityId::Verifier(VerifierId(pk)))
            }
            EntityType::EntityUnknown => {
                Err(BridgeError::ConfigError("Unknown entity type".into()))
            }
        }
    }

    /// Create or get metrics for an entity
    fn get_or_create_metrics(&mut self, entity_id: EntityId) -> &mut EntityL1SyncStatusMetrics {
        self.metrics.entry(entity_id).or_insert_with(|| {
            let scope = format!("{}_l1_sync_status", entity_id);
            EntityL1SyncStatusMetrics::describe(&scope);
            EntityL1SyncStatusMetrics::new(&scope)
        })
    }
}

#[async_trait]
impl Task for AggregatorMetricPublisher {
    const VARIANT: TaskVariant = TaskVariant::MetricPublisher;
    type Output = bool;

    async fn run_once(&mut self) -> Result<Self::Output, BridgeError> {
        // Metrics are not published in tests
        if cfg!(test) {
            return Ok(false);
        }
        tracing::info!("Publishing metrics for aggregator");

        let entity_statuses = self
            .aggregator
            .get_entity_statuses(false)
            .await
            .inspect_err(|e| {
                tracing::error!("Error getting entities status: {:?}", e);
            })?;

        tracing::info!("Entities status: {:?}", entity_statuses);

        // Process each entity status
        for entity_status_with_id in entity_statuses {
            let proto_entity_id = entity_status_with_id
                .entity_id
                .ok_or_else(|| BridgeError::ConfigError("Missing entity_id".into()))?;

            let entity_id = match Self::convert_entity_id(&proto_entity_id) {
                Ok(id) => id,
                Err(e) => {
                    tracing::error!("Failed to convert entity_id: {}", e);
                    continue;
                }
            };

            let metrics = self.get_or_create_metrics(entity_id);

            match entity_status_with_id.status_result {
                Some(crate::rpc::clementine::entity_status_with_id::StatusResult::Status(
                    status,
                )) => {
                    // Parse wallet balance from string (format is "X.XXX BTC")
                    if let Some(balance) = status
                        .wallet_balance
                        .and_then(|s| s.strip_suffix(" BTC").and_then(|s| s.parse::<f64>().ok()))
                    {
                        metrics.wallet_balance_btc.set(balance);
                    }

                    if let Some(height) = status.rpc_tip_height {
                        metrics.rpc_tip_height.set(height as f64);
                    }
                    if let Some(height) = status.bitcoin_syncer_synced_height {
                        metrics.btc_syncer_synced_height.set(height as f64);
                    }
                    if let Some(height) = status.hcp_last_proven_height {
                        metrics.hcp_last_proven_height.set(height as f64);
                    }
                    if let Some(height) = status.tx_sender_synced_height {
                        metrics.tx_sender_synced_height.set(height as f64);
                    }
                    if let Some(height) = status.finalized_synced_height {
                        metrics.finalized_synced_height.set(height as f64);
                    }
                    if let Some(height) = status.state_manager_next_height {
                        metrics.state_manager_next_height.set(height as f64);
                    }
                    if let Some(tasks) = status.stopped_tasks {
                        metrics
                            .stopped_tasks_count
                            .set(tasks.stopped_tasks.len() as f64);
                    }
                }
                Some(crate::rpc::clementine::entity_status_with_id::StatusResult::Err(error)) => {
                    tracing::error!("Entity {} error: {}", entity_id, error.error);
                    // Increment error counter
                    metrics.entity_status_error_count.increment(1);
                }
                None => {
                    tracing::warn!("Entity {} has no status", entity_id);
                }
            }
        }

        // Always delay by returning false (ie. no work done)
        Ok(false)
    }
}