clementine_core/
metrics.rs

1//! This module includes helper functions to get the blockchain synchronization status of the entity.
2//! The entity tracks on-chain transactions for many purposes (TxSender,
3//! FinalizedBlockFetcher, HCP) and takes action (header chain proving, payout,
4//! disprove, L2 state sync, etc.)
5//! SyncStatus tracks the latest processed block heights for each of these tasks.
6//!
7use std::{sync::LazyLock, time::Duration};
8
9use bitcoin::Amount;
10use bitcoincore_rpc::RpcApi;
11use eyre::Context;
12use metrics::Gauge;
13use tokio::time::error::Elapsed;
14use tonic::async_trait;
15
16use crate::{
17    database::Database,
18    extended_bitcoin_rpc::ExtendedBitcoinRpc,
19    utils::{timed_request_base, NamedEntity},
20};
21use clementine_errors::BridgeError;
22use metrics_derive::Metrics;
23
24const L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT: Duration = Duration::from_secs(45);
25
26// Metric scope is named l1_sync_status for backward compatibility even if it includes l2 metrics.
27#[derive(Metrics)]
28#[metrics(scope = "l1_sync_status")]
29/// The sync status metrics for the currently running entity. (operator/verifier)
30pub struct SyncStatusMetrics {
31    #[metric(describe = "The current balance of the wallet in Bitcoin (BTC)")]
32    pub wallet_balance_btc: Gauge,
33    #[metric(describe = "The block height of the chain as seen by Bitcoin Core RPC")]
34    pub rpc_tip_height: Gauge,
35    #[metric(describe = "The block height of the Bitcoin Syncer")]
36    pub btc_syncer_synced_height: Gauge,
37    #[metric(describe = "The block height of the latest header chain proof")]
38    pub hcp_last_proven_height: Gauge,
39    #[metric(describe = "The block height processed by the Transaction Sender")]
40    pub tx_sender_synced_height: Gauge,
41    #[metric(describe = "The finalized block height as seen by the FinalizedBlockFetcher task")]
42    pub finalized_synced_height: Gauge,
43    #[metric(describe = "The next block height to process for the State Manager")]
44    pub state_manager_next_height: Gauge,
45    #[metric(describe = "The current Bitcoin fee rate in sat/vB")]
46    pub bitcoin_fee_rate_sat_vb: Gauge,
47    #[metric(describe = "The last processed Citrea Light Client Proof L1 height for the entity")]
48    pub lcp_synced_height: Gauge,
49    #[metric(describe = "The current Citrea L2 block height")]
50    pub citrea_l2_block_height: Gauge,
51}
52
53#[derive(Metrics)]
54#[metrics(dynamic = true)]
55/// The sync status metrics for an entity. This is used by the aggregator to
56/// publish external entity metrics.  The scope will be set to the EntityId +
57/// "_l1_sync_status", which will be displayed as
58/// `Operator(abcdef123...)_l1_sync_status` or
59/// `Verifier(abcdef123...)_l1_sync_status` where the XOnlyPublicKey's first 10
60/// characters are displayed, cf. [`crate::aggregator::OperatorId`] and
61/// [`crate::aggregator::VerifierId`].
62pub struct EntitySyncStatusMetrics {
63    #[metric(describe = "The current balance of the wallet of the entity in Bitcoin (BTC)")]
64    pub wallet_balance_btc: Gauge,
65    #[metric(
66        describe = "The block height of the chain as seen by Bitcoin Core RPC for the entity"
67    )]
68    pub rpc_tip_height: Gauge,
69    #[metric(describe = "The block height of the Bitcoin Syncer for the entity")]
70    pub btc_syncer_synced_height: Gauge,
71    #[metric(describe = "The block height of the latest header chain proof for the entity")]
72    pub hcp_last_proven_height: Gauge,
73    #[metric(describe = "The block height processed by the Transaction Sender for the entity")]
74    pub tx_sender_synced_height: Gauge,
75    #[metric(
76        describe = "The finalized block height as seen by the FinalizedBlockFetcher task for the entity"
77    )]
78    pub finalized_synced_height: Gauge,
79    #[metric(describe = "The next block height to process for the State Manager for the entity")]
80    pub state_manager_next_height: Gauge,
81
82    #[metric(describe = "The current Bitcoin fee rate in sat/vB for the entity")]
83    pub bitcoin_fee_rate_sat_vb: Gauge,
84    #[metric(describe = "The current Citrea L2 block height for the entity")]
85    pub citrea_l2_block_height: Gauge,
86
87    #[metric(describe = "The number of error responses from the entity status endpoint")]
88    pub entity_status_error_count: metrics::Counter,
89
90    #[metric(describe = "The number of stopped tasks for the entity")]
91    pub stopped_tasks_count: Gauge,
92
93    #[metric(describe = "The last processed Citrea Light Client Proof L1 height for the entity")]
94    pub lcp_synced_height: Gauge,
95}
96
97/// The sync status metrics static for the currently running entity. (operator/verifier)
98pub static ENTITY_SYNC_STATUS: LazyLock<SyncStatusMetrics> = LazyLock::new(|| {
99    SyncStatusMetrics::describe();
100    SyncStatusMetrics::default()
101});
102
103/// A struct containing the current sync status of the entity.
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub struct SyncStatus {
106    pub wallet_balance: Option<Amount>,
107    pub rpc_tip_height: Option<u32>,
108    pub btc_syncer_synced_height: Option<u32>,
109    pub hcp_last_proven_height: Option<u32>,
110    pub tx_sender_synced_height: Option<u32>,
111    pub finalized_synced_height: Option<u32>,
112    pub state_manager_next_height: Option<u32>,
113    pub bitcoin_fee_rate_sat_vb: Option<u64>,
114    pub lcp_synced_height: Option<u32>,
115    pub citrea_l2_block_height: Option<u32>,
116}
117
118/// Get the current balance of the wallet.
119pub async fn get_wallet_balance(rpc: &ExtendedBitcoinRpc) -> Result<Amount, BridgeError> {
120    let balance = rpc
121        .get_balance(None, None)
122        .await
123        .wrap_err("Failed to get wallet balance")?;
124
125    Ok(balance)
126}
127
128/// Get the current height of the chain as seen by Bitcoin Core RPC.
129pub async fn get_rpc_tip_height(rpc: &ExtendedBitcoinRpc) -> Result<u32, BridgeError> {
130    let height = rpc.get_current_chain_height().await?;
131    Ok(height)
132}
133
134/// Get the last processed block height of the given consumer or None if no
135/// block was processed by the consumer.
136pub async fn get_btc_syncer_consumer_last_processed_block_height(
137    db: &Database,
138    consumer_handle: &str,
139) -> Result<Option<u32>, BridgeError> {
140    db.get_last_processed_event_block_height(None, consumer_handle)
141        .await
142}
143
144/// Get the last processed finalized block height of the given consumer or None if no
145/// block was processed by the consumer.
146pub async fn get_btc_syncer_consumer_last_processed_finalized_block_height(
147    db: &Database,
148    consumer_handle: &str,
149    finality_depth: u32,
150) -> Result<Option<u32>, BridgeError> {
151    get_btc_syncer_consumer_last_processed_block_height(db, consumer_handle)
152        .await
153        .map(|opt_height| opt_height.map(|h| h.saturating_sub(finality_depth - 1)))
154}
155
156/// Get the last processed block height of the Bitcoin Syncer or None if no
157/// block is present in the database.
158pub async fn get_btc_syncer_synced_height(db: &Database) -> Result<Option<u32>, BridgeError> {
159    let height = db.get_max_height(None).await?;
160    Ok(height)
161}
162
163/// Get the last proven block height of the HCP or None if no block has been proven.
164pub async fn get_hcp_last_proven_height(db: &Database) -> Result<Option<u32>, BridgeError> {
165    let latest_proven_block_height = db
166        .get_latest_proven_block_info(None)
167        .await?
168        .map(|(_, _, height)| height as u32);
169    Ok(latest_proven_block_height)
170}
171
172/// Get the synced height of the Transaction Sender or None if the
173/// tx_sender_sync_state table is empty or missing.
174pub async fn get_tx_sender_synced_height(db: &Database) -> Result<Option<u32>, BridgeError> {
175    let result: Option<i32> =
176        sqlx::query_scalar("SELECT synced_height FROM tx_sender_sync_state WHERE id = 1")
177            .fetch_optional(&db.get_pool())
178            .await
179            .map_err(BridgeError::DatabaseError)?;
180
181    Ok(result
182        .map(|h| u32::try_from(h).wrap_err("Failed to convert height from DB"))
183        .transpose()?)
184}
185
186/// Get the next height of the State Manager or None if the State Manager status
187/// for the owner is missing or the next_height_to_process is NULL.
188pub async fn get_state_manager_next_height(
189    db: &Database,
190    owner_type: &str,
191) -> Result<Option<u32>, BridgeError> {
192    #[cfg(feature = "automation")]
193    {
194        let next_height = db
195            .get_next_height_to_process(None, owner_type)
196            .await?
197            .map(|x| x as u32);
198        Ok(next_height)
199    }
200    #[cfg(not(feature = "automation"))]
201    {
202        Ok(None)
203    }
204}
205
206/// Get the current Bitcoin fee rate in sat/vB.
207pub async fn get_bitcoin_fee_rate(
208    rpc: &ExtendedBitcoinRpc,
209    config: &crate::config::BridgeConfig,
210) -> Result<u64, BridgeError> {
211    let fee_rate = rpc
212        .get_fee_rate_kvb(
213            config.protocol_paramset.network,
214            &config.mempool_api_host,
215            &config.mempool_api_endpoint,
216            config.tx_sender_limits.mempool_fee_rate_multiplier,
217            config.tx_sender_limits.mempool_fee_rate_offset_sat_kvb,
218            config.tx_sender_limits.fee_rate_hard_cap,
219        )
220        .await
221        .wrap_err("Failed to get fee rate")?;
222
223    // Convert from FeeRate to sat/vB
224    Ok(fee_rate.to_sat_per_vb_ceil())
225}
226
227/// Extension trait on named entities to retrieve their sync status (including both L1 and L2 data).  
228#[async_trait]
229pub trait SyncStatusProvider: NamedEntity {
230    async fn get_sync_status<C: crate::citrea::CitreaClientT>(
231        db: &Database,
232        rpc: &ExtendedBitcoinRpc,
233        config: &crate::config::BridgeConfig,
234        citrea_client: &C,
235    ) -> Result<SyncStatus, BridgeError>;
236}
237
238#[inline(always)]
239fn log_errs_and_ok<A, T: NamedEntity>(
240    result: Result<Result<A, BridgeError>, Elapsed>,
241    action: &str,
242) -> Option<A> {
243    result
244        .inspect_err(|_| {
245            tracing::error!(
246                "[L1SyncStatus({})] Timed out while {action}",
247                T::ENTITY_NAME
248            )
249        })
250        .ok()
251        .transpose()
252        .inspect_err(|e| {
253            tracing::error!("[L1SyncStatus({})] Error {action}: {:?}", T::ENTITY_NAME, e)
254        })
255        .ok()
256        .flatten()
257}
258
259#[async_trait]
260impl<T: NamedEntity> SyncStatusProvider for T {
261    async fn get_sync_status<C: crate::citrea::CitreaClientT>(
262        db: &Database,
263        rpc: &ExtendedBitcoinRpc,
264        config: &crate::config::BridgeConfig,
265        citrea_client: &C,
266    ) -> Result<SyncStatus, BridgeError> {
267        let wallet_balance = log_errs_and_ok::<_, T>(
268            timed_request_base(
269                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
270                "get_wallet_balance",
271                get_wallet_balance(rpc),
272            )
273            .await,
274            "getting wallet balance",
275        );
276
277        let rpc_tip_height = log_errs_and_ok::<_, T>(
278            timed_request_base(
279                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
280                "get_rpc_tip_height",
281                get_rpc_tip_height(rpc),
282            )
283            .await,
284            "getting rpc tip height",
285        );
286
287        #[cfg(feature = "automation")]
288        let finalized_synced_height = log_errs_and_ok::<_, T>(
289            timed_request_base(
290                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
291                "get_finalized_synced_height",
292                get_btc_syncer_consumer_last_processed_finalized_block_height(
293                    db,
294                    T::FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION,
295                    config.protocol_paramset.finality_depth,
296                ),
297            )
298            .await,
299            "getting finalized synced height",
300        )
301        .flatten();
302
303        #[cfg(not(feature = "automation"))]
304        let finalized_synced_height = None;
305
306        let lcp_synced_height = log_errs_and_ok::<_, T>(
307            timed_request_base(
308                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
309                "get_lcp_synced_height",
310                get_btc_syncer_consumer_last_processed_finalized_block_height(
311                    db,
312                    T::LCP_SYNCER_CONSUMER_ID,
313                    config.protocol_paramset.finality_depth,
314                ),
315            )
316            .await,
317            "getting lcp synced height",
318        )
319        .flatten();
320
321        let btc_syncer_synced_height = log_errs_and_ok::<_, T>(
322            timed_request_base(
323                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
324                "get_btc_syncer_synced_height",
325                get_btc_syncer_synced_height(db),
326            )
327            .await,
328            "getting btc syncer synced height",
329        )
330        .flatten();
331
332        let hcp_last_proven_height = log_errs_and_ok::<_, T>(
333            timed_request_base(
334                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
335                "get_hcp_last_proven_height",
336                get_hcp_last_proven_height(db),
337            )
338            .await,
339            "getting hcp last proven height",
340        )
341        .flatten();
342
343        let tx_sender_synced_height = log_errs_and_ok::<_, T>(
344            timed_request_base(
345                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
346                "get_tx_sender_synced_height",
347                get_tx_sender_synced_height(db),
348            )
349            .await,
350            "getting tx sender synced height",
351        )
352        .flatten();
353
354        let state_manager_next_height = log_errs_and_ok::<_, T>(
355            timed_request_base(
356                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
357                "get_state_manager_next_height",
358                get_state_manager_next_height(db, T::ENTITY_NAME),
359            )
360            .await,
361            "getting state manager next height",
362        )
363        .flatten();
364
365        let bitcoin_fee_rate_sat_vb = log_errs_and_ok::<_, T>(
366            timed_request_base(
367                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
368                "get_bitcoin_fee_rate",
369                get_bitcoin_fee_rate(rpc, config),
370            )
371            .await,
372            "getting bitcoin fee rate",
373        );
374
375        let citrea_l2_block_height = log_errs_and_ok::<_, T>(
376            timed_request_base(
377                L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
378                "get_citrea_l2_block_height",
379                citrea_client.get_current_l2_block_height(),
380            )
381            .await,
382            "getting citrea L2 block height",
383        );
384
385        Ok(SyncStatus {
386            wallet_balance,
387            rpc_tip_height,
388            btc_syncer_synced_height,
389            hcp_last_proven_height,
390            tx_sender_synced_height,
391            finalized_synced_height,
392            state_manager_next_height,
393            bitcoin_fee_rate_sat_vb,
394            lcp_synced_height,
395            citrea_l2_block_height,
396        })
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use bitcoincore_rpc::RpcApi;
403
404    #[cfg(not(feature = "automation"))]
405    use crate::rpc::clementine::EntityType;
406    use crate::{
407        rpc::clementine::{Empty, GetEntityStatusesRequest},
408        test::common::{
409            citrea::MockCitreaClient, create_actors, create_regtest_rpc,
410            create_test_config_with_thread_name,
411        },
412    };
413    use std::time::Duration;
414
415    #[tokio::test]
416    async fn test_get_sync_status_should_not_fail() {
417        let mut config = create_test_config_with_thread_name().await;
418        let regtest = create_regtest_rpc(&mut config).await;
419        config.bitcoin_rpc_url += "/wallet/test-wallet";
420        // unload to avoid conflicts
421        regtest.rpc().unload_wallet("admin".into()).await.unwrap();
422
423        // create a test wallet
424        regtest
425            .rpc()
426            .create_wallet("test-wallet", None, None, None, None)
427            .await
428            .unwrap();
429
430        let addr = regtest.rpc().get_new_address(None, None).await.unwrap();
431        regtest
432            .rpc()
433            .generate_to_address(201, addr.assume_checked_ref())
434            .await
435            .unwrap();
436
437        let actors = create_actors::<MockCitreaClient>(&config).await;
438
439        // lose the wallet that was previously loaded for some reason
440        regtest
441            .rpc()
442            .unload_wallet(Some("test-wallet"))
443            .await
444            .unwrap();
445
446        // try to get status which includes balance
447        let res = actors
448            .get_verifier_client_by_index(0)
449            .get_current_status(Empty {})
450            .await;
451
452        // expect result to be Ok(_)
453        assert!(res.is_ok(), "Expected Ok(_) but got {res:?}");
454
455        // expect the balance to be None because the wallet was unloaded
456        assert_eq!(res.unwrap().into_inner().wallet_balance, None);
457    }
458
459    #[tokio::test]
460    async fn test_get_sync_status() {
461        let mut config = create_test_config_with_thread_name().await;
462        let _regtest = create_regtest_rpc(&mut config).await;
463        let actors = create_actors::<MockCitreaClient>(&config).await;
464        let mut aggregator = actors.get_aggregator();
465        // wait for entities to sync a bit, this might cause flakiness, if so increase sleep time or make it serial
466        tokio::time::sleep(Duration::from_secs(40)).await;
467        let entity_statuses = aggregator
468            .get_entity_statuses(tonic::Request::new(GetEntityStatusesRequest {
469                restart_tasks: false,
470            }))
471            .await
472            .unwrap()
473            .into_inner();
474
475        for entity in entity_statuses.entity_statuses {
476            let status = entity.status_result.unwrap();
477            match status {
478                crate::rpc::clementine::entity_status_with_id::StatusResult::Status(status) => {
479                    tracing::info!("Status: {:#?}", status);
480                    #[cfg(feature = "automation")]
481                    {
482                        assert!(status.automation);
483                        assert!(
484                            status
485                                .tx_sender_synced_height
486                                .expect("tx_sender_synced_height is None")
487                                > 0
488                        );
489                        assert!(
490                            status
491                                .finalized_synced_height
492                                .expect("finalized_synced_height is None")
493                                > 0
494                        );
495                        assert!(
496                            status
497                                .hcp_last_proven_height
498                                .expect("hcp_last_proven_height is None")
499                                > 0
500                        );
501                        assert!(status.rpc_tip_height.expect("rpc_tip_height is None") > 0);
502                        assert!(
503                            status
504                                .bitcoin_syncer_synced_height
505                                .expect("bitcoin_syncer_synced_height is None")
506                                > 0
507                        );
508                        assert!(
509                            status
510                                .state_manager_next_height
511                                .expect("state_manager_next_height is None")
512                                > 0
513                        );
514                        assert!(status.wallet_balance.is_some());
515                        assert!(
516                            status
517                                .btc_fee_rate_sat_vb
518                                .expect("btc_fee_rate_sat_vb is None")
519                                > 0
520                        );
521                        assert!(status.citrea_l2_block_height.is_some());
522                    }
523                    #[cfg(not(feature = "automation"))]
524                    {
525                        let entity_type: EntityType =
526                            entity.entity_id.unwrap().kind.try_into().unwrap();
527                        // tx sender and hcp are not running in non-automation mode
528                        assert!(!status.automation);
529                        assert!(status.tx_sender_synced_height.is_none());
530                        if entity_type == EntityType::Verifier {
531                            assert!(
532                                status
533                                    .finalized_synced_height
534                                    .expect("finalized_synced_height is None")
535                                    > 0
536                            );
537                        } else {
538                            // operator doesn't run finalized block fetcher in non-automation mode
539                            assert!(status.finalized_synced_height.is_none());
540                        }
541                        assert!(status.hcp_last_proven_height.is_none());
542                        assert!(status.rpc_tip_height.expect("rpc_tip_height is None") > 0);
543                        assert!(
544                            status
545                                .bitcoin_syncer_synced_height
546                                .expect("bitcoin_syncer_synced_height is None")
547                                > 0
548                        );
549                        assert!(status.state_manager_next_height.is_none());
550                        assert!(status.wallet_balance.is_some());
551                        assert!(
552                            status
553                                .btc_fee_rate_sat_vb
554                                .expect("bitcoin_fee_rate_sat_vb is None")
555                                > 0
556                        );
557                        assert!(status.citrea_l2_block_height.is_some());
558                    }
559                }
560                crate::rpc::clementine::entity_status_with_id::StatusResult::Err(error) => {
561                    let error_msg = &error.error;
562                    panic!("Couldn't get entity status: {error_msg}");
563                }
564            }
565        }
566    }
567}