clementine_core/
metrics.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
//! This module includes helper functions to get the blockchain synchronization status of the entity.
//! The entity tracks on-chain transactions for many purposes (TxSender,
//! FinalizedBlockFetcher, HCP) and takes action (header chain proving, payout,
//! disprove, L2 state sync, etc.)
//! SyncStatus tracks the latest processed block heights for each of these tasks.
//!
use std::{sync::LazyLock, time::Duration};

use bitcoin::Amount;
use bitcoincore_rpc::RpcApi;
use eyre::Context;
use metrics::Gauge;
use tonic::async_trait;

use crate::{
    database::Database,
    errors::BridgeError,
    extended_bitcoin_rpc::ExtendedBitcoinRpc,
    utils::{timed_request_base, NamedEntity},
};
use metrics_derive::Metrics;

const L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT: Duration = Duration::from_secs(45);

#[derive(Metrics)]
#[metrics(scope = "l1_sync_status")]
/// The L1 sync status metrics for the currently running entity. (operator/verifier)
pub struct L1SyncStatusMetrics {
    #[metric(describe = "The current balance of the wallet in Bitcoin (BTC)")]
    pub wallet_balance_btc: Gauge,
    #[metric(describe = "The block height of the chain as seen by Bitcoin Core RPC")]
    pub rpc_tip_height: Gauge,
    #[metric(describe = "The block height of the Bitcoin Syncer")]
    pub btc_syncer_synced_height: Gauge,
    #[metric(describe = "The block height of the latest header chain proof")]
    pub hcp_last_proven_height: Gauge,
    #[metric(describe = "The block height processed by the Transaction Sender")]
    pub tx_sender_synced_height: Gauge,
    #[metric(describe = "The finalized block height as seen by the FinalizedBlockFetcher task")]
    pub finalized_synced_height: Gauge,
    #[metric(describe = "The next block height to process for the State Manager")]
    pub state_manager_next_height: Gauge,
}

#[derive(Metrics)]
#[metrics(dynamic = true)]
/// The L1 sync status metrics for an entity. This is used by the aggregator to
/// publish external entity metrics.  The scope will be set to the EntityId +
/// "_l1_sync_status", which will be displayed as
/// `Operator(abcdef123...)_l1_sync_status` or
/// `Verifier(abcdef123...)_l1_sync_status` where the XOnlyPublicKey's first 10
/// characters are displayed, cf. [`crate::aggregator::OperatorId`] and
/// [`crate::aggregator::VerifierId`].
pub struct EntityL1SyncStatusMetrics {
    #[metric(describe = "The current balance of the wallet of the entity in Bitcoin (BTC)")]
    pub wallet_balance_btc: Gauge,
    #[metric(
        describe = "The block height of the chain as seen by Bitcoin Core RPC for the entity"
    )]
    pub rpc_tip_height: Gauge,
    #[metric(describe = "The block height of the Bitcoin Syncer for the entity")]
    pub btc_syncer_synced_height: Gauge,
    #[metric(describe = "The block height of the latest header chain proof for the entity")]
    pub hcp_last_proven_height: Gauge,
    #[metric(describe = "The block height processed by the Transaction Sender for the entity")]
    pub tx_sender_synced_height: Gauge,
    #[metric(
        describe = "The finalized block height as seen by the FinalizedBlockFetcher task for the entity"
    )]
    pub finalized_synced_height: Gauge,
    #[metric(describe = "The next block height to process for the State Manager for the entity")]
    pub state_manager_next_height: Gauge,

    #[metric(describe = "The number of error responses from the entity status endpoint")]
    pub entity_status_error_count: metrics::Counter,

    #[metric(describe = "The number of stopped tasks for the entity")]
    pub stopped_tasks_count: Gauge,
}

/// The L1 sync status metrics static for the currently running entity. (operator/verifier)
pub static L1_SYNC_STATUS: LazyLock<L1SyncStatusMetrics> = LazyLock::new(|| {
    L1SyncStatusMetrics::describe();
    L1SyncStatusMetrics::default()
});

/// A struct containing the current sync status of the entity.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct L1SyncStatus {
    pub wallet_balance: Option<Amount>,
    pub rpc_tip_height: Option<u32>,
    pub btc_syncer_synced_height: Option<u32>,
    pub hcp_last_proven_height: Option<u32>,
    pub tx_sender_synced_height: Option<u32>,
    pub finalized_synced_height: Option<u32>,
    pub state_manager_next_height: Option<u32>,
}

/// Get the current balance of the wallet.
pub async fn get_wallet_balance(rpc: &ExtendedBitcoinRpc) -> Result<Amount, BridgeError> {
    let balance = rpc
        .get_balance(None, None)
        .await
        .wrap_err("Failed to get wallet balance")?;

    Ok(balance)
}

/// Get the current height of the chain as seen by Bitcoin Core RPC.
pub async fn get_rpc_tip_height(rpc: &ExtendedBitcoinRpc) -> Result<u32, BridgeError> {
    let height = rpc.get_current_chain_height().await?;
    Ok(height)
}

/// Get the last processed block height of the given consumer or None if no
/// block was processed by the consumer.
pub async fn get_btc_syncer_consumer_last_processed_block_height(
    db: &Database,
    consumer_handle: &str,
) -> Result<Option<u32>, BridgeError> {
    db.get_last_processed_event_block_height(None, consumer_handle)
        .await
}

/// Get the last processed block height of the Bitcoin Syncer or None if no
/// block is present in the database.
pub async fn get_btc_syncer_synced_height(db: &Database) -> Result<Option<u32>, BridgeError> {
    let height = db.get_max_height(None).await?;
    Ok(height)
}

/// Get the last proven block height of the HCP or None if no block has been proven.
pub async fn get_hcp_last_proven_height(db: &Database) -> Result<Option<u32>, BridgeError> {
    let latest_proven_block_height = db
        .get_latest_proven_block_info(None)
        .await?
        .map(|(_, _, height)| height as u32);
    Ok(latest_proven_block_height)
}

/// Get the next height of the State Manager or None if the State Manager status
/// for the owner is missing or the next_height_to_process is NULL.
pub async fn get_state_manager_next_height(
    db: &Database,
    owner_type: &str,
) -> Result<Option<u32>, BridgeError> {
    #[cfg(feature = "automation")]
    {
        let next_height = db
            .get_next_height_to_process(None, owner_type)
            .await?
            .map(|x| x as u32);
        Ok(next_height)
    }
    #[cfg(not(feature = "automation"))]
    {
        Ok(None)
    }
}

#[async_trait]
/// Extension trait on named entities who synchronize to the L1 data, to retrieve their L1 sync status.
pub trait L1SyncStatusProvider: NamedEntity {
    async fn get_l1_status(
        db: &Database,
        rpc: &ExtendedBitcoinRpc,
    ) -> Result<L1SyncStatus, BridgeError>;
}

#[async_trait]
impl<T: NamedEntity + Sync + Send + 'static> L1SyncStatusProvider for T {
    async fn get_l1_status(
        db: &Database,
        rpc: &ExtendedBitcoinRpc,
    ) -> Result<L1SyncStatus, BridgeError> {
        let wallet_balance = timed_request_base(
            L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
            "get_wallet_balance",
            get_wallet_balance(rpc),
        )
        .await
        .ok()
        .transpose()?;

        let rpc_tip_height = timed_request_base(
            L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
            "get_rpc_tip_height",
            get_rpc_tip_height(rpc),
        )
        .await
        .ok()
        .transpose()?;

        let tx_sender_synced_height = timed_request_base(
            L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
            "get_tx_sender_synced_height",
            get_btc_syncer_consumer_last_processed_block_height(db, T::TX_SENDER_CONSUMER_ID),
        )
        .await
        .ok()
        .transpose()?
        .flatten();

        #[cfg(feature = "automation")]
        let finalized_synced_height = timed_request_base(
            L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
            "get_finalized_synced_height",
            get_btc_syncer_consumer_last_processed_block_height(
                db,
                T::FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION,
            ),
        )
        .await
        .ok()
        .transpose()?
        .flatten();

        #[cfg(not(feature = "automation"))]
        let finalized_synced_height = timed_request_base(
            L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
            "get_finalized_synced_height",
            get_btc_syncer_consumer_last_processed_block_height(
                db,
                T::FINALIZED_BLOCK_CONSUMER_ID_NO_AUTOMATION,
            ),
        )
        .await
        .ok()
        .transpose()?
        .flatten();

        let btc_syncer_synced_height = timed_request_base(
            L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
            "get_btc_syncer_synced_height",
            get_btc_syncer_synced_height(db),
        )
        .await
        .ok()
        .transpose()?
        .flatten();
        let hcp_last_proven_height = timed_request_base(
            L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
            "get_hcp_last_proven_height",
            get_hcp_last_proven_height(db),
        )
        .await
        .ok()
        .transpose()?
        .flatten();
        let state_manager_next_height = timed_request_base(
            L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
            "get_state_manager_next_height",
            get_state_manager_next_height(db, T::ENTITY_NAME),
        )
        .await
        .ok()
        .transpose()?
        .flatten();

        Ok(L1SyncStatus {
            wallet_balance,
            rpc_tip_height,
            btc_syncer_synced_height,
            hcp_last_proven_height,
            tx_sender_synced_height,
            finalized_synced_height,
            state_manager_next_height,
        })
    }
}

#[cfg(test)]
mod tests {
    #[cfg(not(feature = "automation"))]
    use crate::rpc::clementine::EntityType;
    use crate::{
        rpc::clementine::GetEntityStatusesRequest,
        test::common::{
            citrea::MockCitreaClient, create_actors, create_regtest_rpc,
            create_test_config_with_thread_name,
        },
    };
    use std::time::Duration;

    #[tokio::test]
    async fn test_get_sync_status() {
        let mut config = create_test_config_with_thread_name().await;
        let _regtest = create_regtest_rpc(&mut config).await;
        let actors = create_actors::<MockCitreaClient>(&config).await;
        let mut aggregator = actors.get_aggregator();
        // wait for entities to sync a bit, this might cause flakiness, if so increase sleep time or make it serial
        tokio::time::sleep(Duration::from_secs(40)).await;
        let entity_statuses = aggregator
            .get_entity_statuses(tonic::Request::new(GetEntityStatusesRequest {
                restart_tasks: false,
            }))
            .await
            .unwrap()
            .into_inner();

        for entity in entity_statuses.entity_statuses {
            let status = entity.status_result.unwrap();
            match status {
                crate::rpc::clementine::entity_status_with_id::StatusResult::Status(status) => {
                    tracing::info!("Status: {:#?}", status);
                    #[cfg(feature = "automation")]
                    {
                        assert!(status.automation);
                        assert!(
                            status
                                .tx_sender_synced_height
                                .expect("tx_sender_synced_height is None")
                                > 0
                        );
                        assert!(
                            status
                                .finalized_synced_height
                                .expect("finalized_synced_height is None")
                                > 0
                        );
                        assert!(
                            status
                                .hcp_last_proven_height
                                .expect("hcp_last_proven_height is None")
                                > 0
                        );
                        assert!(status.rpc_tip_height.expect("rpc_tip_height is None") > 0);
                        assert!(
                            status
                                .bitcoin_syncer_synced_height
                                .expect("bitcoin_syncer_synced_height is None")
                                > 0
                        );
                        assert!(
                            status
                                .state_manager_next_height
                                .expect("state_manager_next_height is None")
                                > 0
                        );
                    }
                    #[cfg(not(feature = "automation"))]
                    {
                        let entity_type: EntityType =
                            entity.entity_id.unwrap().kind.try_into().unwrap();
                        // tx sender and hcp are not running in non-automation mode
                        assert!(!status.automation);
                        assert!(status.tx_sender_synced_height.is_none());
                        if entity_type == EntityType::Verifier {
                            assert!(
                                status
                                    .finalized_synced_height
                                    .expect("finalized_synced_height is None")
                                    > 0
                            );
                        } else {
                            // operator doesn't run finalized block fetcher in non-automation mode
                            assert!(status.finalized_synced_height.is_none());
                        }
                        assert!(status.hcp_last_proven_height.is_none());
                        assert!(status.rpc_tip_height.expect("rpc_tip_height is None") > 0);
                        assert!(
                            status
                                .bitcoin_syncer_synced_height
                                .expect("bitcoin_syncer_synced_height is None")
                                > 0
                        );
                        assert!(status.state_manager_next_height.is_none());
                    }
                }
                crate::rpc::clementine::entity_status_with_id::StatusResult::Err(error) => {
                    panic!("Couldn't get entity status: {}", error.error);
                }
            }
        }
    }
}