clementine_core/
metrics.rs

1//! This module includes helper functions to get the blockchain synchronization status of the entity.
2//! The entity tracks on-chain transactions for many purposes (TxSender,
3//! FinalizedBlockFetcher, HCP) and takes action (header chain proving, payout,
4//! disprove, L2 state sync, etc.)
5//! SyncStatus tracks the latest processed block heights for each of these tasks.
6//!
7use std::{sync::LazyLock, time::Duration};
8
9use bitcoin::Amount;
10use bitcoincore_rpc::RpcApi;
11use eyre::Context;
12use metrics::Gauge;
13use tokio::time::error::Elapsed;
14use tonic::async_trait;
15
16use crate::{
17    database::Database,
18    errors::BridgeError,
19    extended_bitcoin_rpc::ExtendedBitcoinRpc,
20    utils::{timed_request_base, NamedEntity},
21};
22use metrics_derive::Metrics;
23
24const L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT: Duration = Duration::from_secs(45);
25
26#[derive(Metrics)]
27#[metrics(scope = "l1_sync_status")]
28/// The L1 sync status metrics for the currently running entity. (operator/verifier)
29pub struct L1SyncStatusMetrics {
30    #[metric(describe = "The current balance of the wallet in Bitcoin (BTC)")]
31    pub wallet_balance_btc: Gauge,
32    #[metric(describe = "The block height of the chain as seen by Bitcoin Core RPC")]
33    pub rpc_tip_height: Gauge,
34    #[metric(describe = "The block height of the Bitcoin Syncer")]
35    pub btc_syncer_synced_height: Gauge,
36    #[metric(describe = "The block height of the latest header chain proof")]
37    pub hcp_last_proven_height: Gauge,
38    #[metric(describe = "The block height processed by the Transaction Sender")]
39    pub tx_sender_synced_height: Gauge,
40    #[metric(describe = "The finalized block height as seen by the FinalizedBlockFetcher task")]
41    pub finalized_synced_height: Gauge,
42    #[metric(describe = "The next block height to process for the State Manager")]
43    pub state_manager_next_height: Gauge,
44    #[metric(describe = "The current Bitcoin fee rate in sat/vB")]
45    pub bitcoin_fee_rate_sat_vb: Gauge,
46}
47
48#[derive(Metrics)]
49#[metrics(dynamic = true)]
50/// The L1 sync status metrics for an entity. This is used by the aggregator to
51/// publish external entity metrics.  The scope will be set to the EntityId +
52/// "_l1_sync_status", which will be displayed as
53/// `Operator(abcdef123...)_l1_sync_status` or
54/// `Verifier(abcdef123...)_l1_sync_status` where the XOnlyPublicKey's first 10
55/// characters are displayed, cf. [`crate::aggregator::OperatorId`] and
56/// [`crate::aggregator::VerifierId`].
57pub struct EntityL1SyncStatusMetrics {
58    #[metric(describe = "The current balance of the wallet of the entity in Bitcoin (BTC)")]
59    pub wallet_balance_btc: Gauge,
60    #[metric(
61        describe = "The block height of the chain as seen by Bitcoin Core RPC for the entity"
62    )]
63    pub rpc_tip_height: Gauge,
64    #[metric(describe = "The block height of the Bitcoin Syncer for the entity")]
65    pub btc_syncer_synced_height: Gauge,
66    #[metric(describe = "The block height of the latest header chain proof for the entity")]
67    pub hcp_last_proven_height: Gauge,
68    #[metric(describe = "The block height processed by the Transaction Sender for the entity")]
69    pub tx_sender_synced_height: Gauge,
70    #[metric(
71        describe = "The finalized block height as seen by the FinalizedBlockFetcher task for the entity"
72    )]
73    pub finalized_synced_height: Gauge,
74    #[metric(describe = "The next block height to process for the State Manager for the entity")]
75    pub state_manager_next_height: Gauge,
76
77    #[metric(describe = "The current Bitcoin fee rate in sat/vB for the entity")]
78    pub bitcoin_fee_rate_sat_vb: Gauge,
79
80    #[metric(describe = "The number of error responses from the entity status endpoint")]
81    pub entity_status_error_count: metrics::Counter,
82
83    #[metric(describe = "The number of stopped tasks for the entity")]
84    pub stopped_tasks_count: Gauge,
85}
86
87/// The L1 sync status metrics static for the currently running entity. (operator/verifier)
88pub static L1_SYNC_STATUS: LazyLock<L1SyncStatusMetrics> = LazyLock::new(|| {
89    L1SyncStatusMetrics::describe();
90    L1SyncStatusMetrics::default()
91});
92
93/// A struct containing the current sync status of the entity.
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct L1SyncStatus {
96    pub wallet_balance: Option<Amount>,
97    pub rpc_tip_height: Option<u32>,
98    pub btc_syncer_synced_height: Option<u32>,
99    pub hcp_last_proven_height: Option<u32>,
100    pub tx_sender_synced_height: Option<u32>,
101    pub finalized_synced_height: Option<u32>,
102    pub state_manager_next_height: Option<u32>,
103    pub bitcoin_fee_rate_sat_vb: Option<u64>,
104}
105
106/// Get the current balance of the wallet.
107pub async fn get_wallet_balance(rpc: &ExtendedBitcoinRpc) -> Result<Amount, BridgeError> {
108    let balance = rpc
109        .get_balance(None, None)
110        .await
111        .wrap_err("Failed to get wallet balance")?;
112
113    Ok(balance)
114}
115
116/// Get the current height of the chain as seen by Bitcoin Core RPC.
117pub async fn get_rpc_tip_height(rpc: &ExtendedBitcoinRpc) -> Result<u32, BridgeError> {
118    let height = rpc.get_current_chain_height().await?;
119    Ok(height)
120}
121
122/// Get the last processed block height of the given consumer or None if no
123/// block was processed by the consumer.
124pub async fn get_btc_syncer_consumer_last_processed_block_height(
125    db: &Database,
126    consumer_handle: &str,
127) -> Result<Option<u32>, BridgeError> {
128    db.get_last_processed_event_block_height(None, consumer_handle)
129        .await
130}
131
132/// Get the last processed block height of the Bitcoin Syncer or None if no
133/// block is present in the database.
134pub async fn get_btc_syncer_synced_height(db: &Database) -> Result<Option<u32>, BridgeError> {
135    let height = db.get_max_height(None).await?;
136    Ok(height)
137}
138
139/// Get the last proven block height of the HCP or None if no block has been proven.
140pub async fn get_hcp_last_proven_height(db: &Database) -> Result<Option<u32>, BridgeError> {
141    let latest_proven_block_height = db
142        .get_latest_proven_block_info(None)
143        .await?
144        .map(|(_, _, height)| height as u32);
145    Ok(latest_proven_block_height)
146}
147
148/// Get the next height of the State Manager or None if the State Manager status
149/// for the owner is missing or the next_height_to_process is NULL.
150pub async fn get_state_manager_next_height(
151    db: &Database,
152    owner_type: &str,
153) -> Result<Option<u32>, BridgeError> {
154    #[cfg(feature = "automation")]
155    {
156        let next_height = db
157            .get_next_height_to_process(None, owner_type)
158            .await?
159            .map(|x| x as u32);
160        Ok(next_height)
161    }
162    #[cfg(not(feature = "automation"))]
163    {
164        Ok(None)
165    }
166}
167
168/// Get the current Bitcoin fee rate in sat/vB.
169pub async fn get_bitcoin_fee_rate(
170    rpc: &ExtendedBitcoinRpc,
171    config: &crate::config::BridgeConfig,
172) -> Result<u64, BridgeError> {
173    let fee_rate = rpc
174        .get_fee_rate(
175            config.protocol_paramset.network,
176            &config.mempool_api_host,
177            &config.mempool_api_endpoint,
178            config.tx_sender_limits.mempool_fee_rate_multiplier,
179            config.tx_sender_limits.mempool_fee_rate_offset_sat_kvb,
180            config.tx_sender_limits.fee_rate_hard_cap,
181        )
182        .await
183        .wrap_err("Failed to get fee rate")?;
184
185    // Convert from FeeRate to sat/vB
186    Ok(fee_rate.to_sat_per_vb_ceil())
187}
188
189#[async_trait]
190/// Extension trait on named entities who synchronize to the L1 data, to retrieve their L1 sync status.
191pub trait L1SyncStatusProvider: NamedEntity {
192    async fn get_l1_status(
193        db: &Database,
194        rpc: &ExtendedBitcoinRpc,
195        config: &crate::config::BridgeConfig,
196    ) -> Result<L1SyncStatus, BridgeError>;
197}
198
199#[inline(always)]
200fn log_errs_and_ok<A, T: NamedEntity>(
201    result: Result<Result<A, BridgeError>, Elapsed>,
202    action: &str,
203) -> Option<A> {
204    result
205        .inspect_err(|_| {
206            tracing::error!(
207                "[L1SyncStatus({})] Timed out while {action}",
208                T::ENTITY_NAME
209            )
210        })
211        .ok()
212        .transpose()
213        .inspect_err(|e| {
214            tracing::error!("[L1SyncStatus({})] Error {action}: {:?}", T::ENTITY_NAME, e)
215        })
216        .ok()
217        .flatten()
218}
219
220#[async_trait]
221impl<T: NamedEntity> L1SyncStatusProvider for T {
222    async fn get_l1_status(
223        db: &Database,
224        rpc: &ExtendedBitcoinRpc,
225        config: &crate::config::BridgeConfig,
226    ) -> Result<L1SyncStatus, BridgeError> {
227        let wallet_balance = log_errs_and_ok::<_, T>(
228            timed_request_base(
229                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
230                "get_wallet_balance",
231                get_wallet_balance(rpc),
232            )
233            .await,
234            "getting wallet balance",
235        );
236
237        let rpc_tip_height = log_errs_and_ok::<_, T>(
238            timed_request_base(
239                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
240                "get_rpc_tip_height",
241                get_rpc_tip_height(rpc),
242            )
243            .await,
244            "getting rpc tip height",
245        );
246
247        let tx_sender_synced_height = log_errs_and_ok::<_, T>(
248            timed_request_base(
249                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
250                "get_tx_sender_synced_height",
251                get_btc_syncer_consumer_last_processed_block_height(db, T::TX_SENDER_CONSUMER_ID),
252            )
253            .await,
254            "getting tx sender synced height",
255        )
256        .flatten();
257
258        #[cfg(feature = "automation")]
259        let finalized_synced_height = log_errs_and_ok::<_, T>(
260            timed_request_base(
261                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
262                "get_finalized_synced_height",
263                get_btc_syncer_consumer_last_processed_block_height(
264                    db,
265                    T::FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION,
266                ),
267            )
268            .await,
269            "getting finalized synced height",
270        )
271        .flatten();
272
273        #[cfg(not(feature = "automation"))]
274        let finalized_synced_height = log_errs_and_ok::<_, T>(
275            timed_request_base(
276                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
277                "get_finalized_synced_height",
278                get_btc_syncer_consumer_last_processed_block_height(
279                    db,
280                    T::FINALIZED_BLOCK_CONSUMER_ID_NO_AUTOMATION,
281                ),
282            )
283            .await,
284            "getting finalized synced height",
285        )
286        .flatten();
287
288        let btc_syncer_synced_height = log_errs_and_ok::<_, T>(
289            timed_request_base(
290                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
291                "get_btc_syncer_synced_height",
292                get_btc_syncer_synced_height(db),
293            )
294            .await,
295            "getting btc syncer synced height",
296        )
297        .flatten();
298
299        let hcp_last_proven_height = log_errs_and_ok::<_, T>(
300            timed_request_base(
301                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
302                "get_hcp_last_proven_height",
303                get_hcp_last_proven_height(db),
304            )
305            .await,
306            "getting hcp last proven height",
307        )
308        .flatten();
309        let state_manager_next_height = log_errs_and_ok::<_, T>(
310            timed_request_base(
311                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
312                "get_state_manager_next_height",
313                get_state_manager_next_height(db, T::ENTITY_NAME),
314            )
315            .await,
316            "getting state manager next height",
317        )
318        .flatten();
319
320        let bitcoin_fee_rate_sat_vb = log_errs_and_ok::<_, T>(
321            timed_request_base(
322                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
323                "get_bitcoin_fee_rate",
324                get_bitcoin_fee_rate(rpc, config),
325            )
326            .await,
327            "getting bitcoin fee rate",
328        );
329
330        Ok(L1SyncStatus {
331            wallet_balance,
332            rpc_tip_height,
333            btc_syncer_synced_height,
334            hcp_last_proven_height,
335            tx_sender_synced_height,
336            finalized_synced_height,
337            state_manager_next_height,
338            bitcoin_fee_rate_sat_vb,
339        })
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use bitcoincore_rpc::RpcApi;
346
347    #[cfg(not(feature = "automation"))]
348    use crate::rpc::clementine::EntityType;
349    use crate::{
350        rpc::clementine::{Empty, GetEntityStatusesRequest},
351        test::common::{
352            citrea::MockCitreaClient, create_actors, create_regtest_rpc,
353            create_test_config_with_thread_name,
354        },
355    };
356    use std::time::Duration;
357
358    #[tokio::test]
359    async fn test_get_sync_status_should_not_fail() {
360        let mut config = create_test_config_with_thread_name().await;
361        let regtest = create_regtest_rpc(&mut config).await;
362        config.bitcoin_rpc_url += "/wallet/test-wallet";
363        // unload to avoid conflicts
364        regtest.rpc().unload_wallet("admin".into()).await.unwrap();
365
366        // create a test wallet
367        regtest
368            .rpc()
369            .create_wallet("test-wallet", None, None, None, None)
370            .await
371            .unwrap();
372
373        let addr = regtest.rpc().get_new_address(None, None).await.unwrap();
374        regtest
375            .rpc()
376            .generate_to_address(201, addr.assume_checked_ref())
377            .await
378            .unwrap();
379
380        let actors = create_actors::<MockCitreaClient>(&config).await;
381
382        // lose the wallet that was previously loaded for some reason
383        regtest
384            .rpc()
385            .unload_wallet(Some("test-wallet"))
386            .await
387            .unwrap();
388
389        // try to get status which includes balance
390        let res = actors
391            .get_verifier_client_by_index(0)
392            .get_current_status(Empty {})
393            .await;
394
395        // expect result to be Ok(_)
396        assert!(res.is_ok(), "Expected Ok(_) but got {res:?}");
397
398        // expect the balance to be None because the wallet was unloaded
399        assert_eq!(res.unwrap().into_inner().wallet_balance, None);
400    }
401
402    #[tokio::test]
403    async fn test_get_sync_status() {
404        let mut config = create_test_config_with_thread_name().await;
405        let _regtest = create_regtest_rpc(&mut config).await;
406        let actors = create_actors::<MockCitreaClient>(&config).await;
407        let mut aggregator = actors.get_aggregator();
408        // wait for entities to sync a bit, this might cause flakiness, if so increase sleep time or make it serial
409        tokio::time::sleep(Duration::from_secs(40)).await;
410        let entity_statuses = aggregator
411            .get_entity_statuses(tonic::Request::new(GetEntityStatusesRequest {
412                restart_tasks: false,
413            }))
414            .await
415            .unwrap()
416            .into_inner();
417
418        for entity in entity_statuses.entity_statuses {
419            let status = entity.status_result.unwrap();
420            match status {
421                crate::rpc::clementine::entity_status_with_id::StatusResult::Status(status) => {
422                    tracing::info!("Status: {:#?}", status);
423                    #[cfg(feature = "automation")]
424                    {
425                        assert!(status.automation);
426                        assert!(
427                            status
428                                .tx_sender_synced_height
429                                .expect("tx_sender_synced_height is None")
430                                > 0
431                        );
432                        assert!(
433                            status
434                                .finalized_synced_height
435                                .expect("finalized_synced_height is None")
436                                > 0
437                        );
438                        assert!(
439                            status
440                                .hcp_last_proven_height
441                                .expect("hcp_last_proven_height is None")
442                                > 0
443                        );
444                        assert!(status.rpc_tip_height.expect("rpc_tip_height is None") > 0);
445                        assert!(
446                            status
447                                .bitcoin_syncer_synced_height
448                                .expect("bitcoin_syncer_synced_height is None")
449                                > 0
450                        );
451                        assert!(
452                            status
453                                .state_manager_next_height
454                                .expect("state_manager_next_height is None")
455                                > 0
456                        );
457                        assert!(status.wallet_balance.is_some());
458                        assert!(
459                            status
460                                .btc_fee_rate_sat_vb
461                                .expect("btc_fee_rate_sat_vb is None")
462                                > 0
463                        );
464                    }
465                    #[cfg(not(feature = "automation"))]
466                    {
467                        let entity_type: EntityType =
468                            entity.entity_id.unwrap().kind.try_into().unwrap();
469                        // tx sender and hcp are not running in non-automation mode
470                        assert!(!status.automation);
471                        assert!(status.tx_sender_synced_height.is_none());
472                        if entity_type == EntityType::Verifier {
473                            assert!(
474                                status
475                                    .finalized_synced_height
476                                    .expect("finalized_synced_height is None")
477                                    > 0
478                            );
479                        } else {
480                            // operator doesn't run finalized block fetcher in non-automation mode
481                            assert!(status.finalized_synced_height.is_none());
482                        }
483                        assert!(status.hcp_last_proven_height.is_none());
484                        assert!(status.rpc_tip_height.expect("rpc_tip_height is None") > 0);
485                        assert!(
486                            status
487                                .bitcoin_syncer_synced_height
488                                .expect("bitcoin_syncer_synced_height is None")
489                                > 0
490                        );
491                        assert!(status.state_manager_next_height.is_none());
492                        assert!(status.wallet_balance.is_some());
493                        assert!(
494                            status
495                                .btc_fee_rate_sat_vb
496                                .expect("bitcoin_fee_rate_sat_vb is None")
497                                > 0
498                        );
499                    }
500                }
501                crate::rpc::clementine::entity_status_with_id::StatusResult::Err(error) => {
502                    let error_msg = &error.error;
503                    panic!("Couldn't get entity status: {error_msg}");
504                }
505            }
506        }
507    }
508}