1use std::{sync::LazyLock, time::Duration};
8
9use bitcoin::Amount;
10use bitcoincore_rpc::RpcApi;
11use eyre::Context;
12use metrics::Gauge;
13use tokio::time::error::Elapsed;
14use tonic::async_trait;
15
16use crate::{
17 database::Database,
18 extended_bitcoin_rpc::ExtendedBitcoinRpc,
19 utils::{timed_request_base, NamedEntity},
20};
21use clementine_errors::BridgeError;
22use metrics_derive::Metrics;
23
24const L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT: Duration = Duration::from_secs(45);
25
26#[derive(Metrics)]
28#[metrics(scope = "l1_sync_status")]
29pub struct SyncStatusMetrics {
31 #[metric(describe = "The current balance of the wallet in Bitcoin (BTC)")]
32 pub wallet_balance_btc: Gauge,
33 #[metric(describe = "The block height of the chain as seen by Bitcoin Core RPC")]
34 pub rpc_tip_height: Gauge,
35 #[metric(describe = "The block height of the Bitcoin Syncer")]
36 pub btc_syncer_synced_height: Gauge,
37 #[metric(describe = "The block height of the latest header chain proof")]
38 pub hcp_last_proven_height: Gauge,
39 #[metric(describe = "The block height processed by the Transaction Sender")]
40 pub tx_sender_synced_height: Gauge,
41 #[metric(describe = "The finalized block height as seen by the FinalizedBlockFetcher task")]
42 pub finalized_synced_height: Gauge,
43 #[metric(describe = "The next block height to process for the State Manager")]
44 pub state_manager_next_height: Gauge,
45 #[metric(describe = "The current Bitcoin fee rate in sat/vB")]
46 pub bitcoin_fee_rate_sat_vb: Gauge,
47 #[metric(describe = "The last processed Citrea Light Client Proof L1 height for the entity")]
48 pub lcp_synced_height: Gauge,
49 #[metric(describe = "The current Citrea L2 block height")]
50 pub citrea_l2_block_height: Gauge,
51}
52
53#[derive(Metrics)]
54#[metrics(dynamic = true)]
55pub struct EntitySyncStatusMetrics {
63 #[metric(describe = "The current balance of the wallet of the entity in Bitcoin (BTC)")]
64 pub wallet_balance_btc: Gauge,
65 #[metric(
66 describe = "The block height of the chain as seen by Bitcoin Core RPC for the entity"
67 )]
68 pub rpc_tip_height: Gauge,
69 #[metric(describe = "The block height of the Bitcoin Syncer for the entity")]
70 pub btc_syncer_synced_height: Gauge,
71 #[metric(describe = "The block height of the latest header chain proof for the entity")]
72 pub hcp_last_proven_height: Gauge,
73 #[metric(describe = "The block height processed by the Transaction Sender for the entity")]
74 pub tx_sender_synced_height: Gauge,
75 #[metric(
76 describe = "The finalized block height as seen by the FinalizedBlockFetcher task for the entity"
77 )]
78 pub finalized_synced_height: Gauge,
79 #[metric(describe = "The next block height to process for the State Manager for the entity")]
80 pub state_manager_next_height: Gauge,
81
82 #[metric(describe = "The current Bitcoin fee rate in sat/vB for the entity")]
83 pub bitcoin_fee_rate_sat_vb: Gauge,
84 #[metric(describe = "The current Citrea L2 block height for the entity")]
85 pub citrea_l2_block_height: Gauge,
86
87 #[metric(describe = "The number of error responses from the entity status endpoint")]
88 pub entity_status_error_count: metrics::Counter,
89
90 #[metric(describe = "The number of stopped tasks for the entity")]
91 pub stopped_tasks_count: Gauge,
92
93 #[metric(describe = "The last processed Citrea Light Client Proof L1 height for the entity")]
94 pub lcp_synced_height: Gauge,
95}
96
97pub static ENTITY_SYNC_STATUS: LazyLock<SyncStatusMetrics> = LazyLock::new(|| {
99 SyncStatusMetrics::describe();
100 SyncStatusMetrics::default()
101});
102
103#[derive(Debug, Clone, PartialEq, Eq)]
105pub struct SyncStatus {
106 pub wallet_balance: Option<Amount>,
107 pub rpc_tip_height: Option<u32>,
108 pub btc_syncer_synced_height: Option<u32>,
109 pub hcp_last_proven_height: Option<u32>,
110 pub tx_sender_synced_height: Option<u32>,
111 pub finalized_synced_height: Option<u32>,
112 pub state_manager_next_height: Option<u32>,
113 pub bitcoin_fee_rate_sat_vb: Option<u64>,
114 pub lcp_synced_height: Option<u32>,
115 pub citrea_l2_block_height: Option<u32>,
116}
117
118pub async fn get_wallet_balance(rpc: &ExtendedBitcoinRpc) -> Result<Amount, BridgeError> {
120 let balance = rpc
121 .get_balance(None, None)
122 .await
123 .wrap_err("Failed to get wallet balance")?;
124
125 Ok(balance)
126}
127
128pub async fn get_rpc_tip_height(rpc: &ExtendedBitcoinRpc) -> Result<u32, BridgeError> {
130 let height = rpc.get_current_chain_height().await?;
131 Ok(height)
132}
133
134pub async fn get_btc_syncer_consumer_last_processed_block_height(
137 db: &Database,
138 consumer_handle: &str,
139) -> Result<Option<u32>, BridgeError> {
140 db.get_last_processed_event_block_height(None, consumer_handle)
141 .await
142}
143
144pub async fn get_btc_syncer_consumer_last_processed_finalized_block_height(
147 db: &Database,
148 consumer_handle: &str,
149 finality_depth: u32,
150) -> Result<Option<u32>, BridgeError> {
151 get_btc_syncer_consumer_last_processed_block_height(db, consumer_handle)
152 .await
153 .map(|opt_height| opt_height.map(|h| h.saturating_sub(finality_depth - 1)))
154}
155
156pub async fn get_btc_syncer_synced_height(db: &Database) -> Result<Option<u32>, BridgeError> {
159 let height = db.get_max_height(None).await?;
160 Ok(height)
161}
162
163pub async fn get_hcp_last_proven_height(db: &Database) -> Result<Option<u32>, BridgeError> {
165 let latest_proven_block_height = db
166 .get_latest_proven_block_info(None)
167 .await?
168 .map(|(_, _, height)| height as u32);
169 Ok(latest_proven_block_height)
170}
171
172pub async fn get_tx_sender_synced_height(db: &Database) -> Result<Option<u32>, BridgeError> {
175 let result: Option<i32> =
176 sqlx::query_scalar("SELECT synced_height FROM tx_sender_sync_state WHERE id = 1")
177 .fetch_optional(&db.get_pool())
178 .await
179 .map_err(BridgeError::DatabaseError)?;
180
181 Ok(result
182 .map(|h| u32::try_from(h).wrap_err("Failed to convert height from DB"))
183 .transpose()?)
184}
185
186pub async fn get_state_manager_next_height(
189 db: &Database,
190 owner_type: &str,
191) -> Result<Option<u32>, BridgeError> {
192 #[cfg(feature = "automation")]
193 {
194 let next_height = db
195 .get_next_height_to_process(None, owner_type)
196 .await?
197 .map(|x| x as u32);
198 Ok(next_height)
199 }
200 #[cfg(not(feature = "automation"))]
201 {
202 Ok(None)
203 }
204}
205
206pub async fn get_bitcoin_fee_rate(
208 rpc: &ExtendedBitcoinRpc,
209 config: &crate::config::BridgeConfig,
210) -> Result<u64, BridgeError> {
211 let fee_rate = rpc
212 .get_fee_rate_kvb(
213 config.protocol_paramset.network,
214 &config.mempool_api_host,
215 &config.mempool_api_endpoint,
216 config.tx_sender_limits.mempool_fee_rate_multiplier,
217 config.tx_sender_limits.mempool_fee_rate_offset_sat_kvb,
218 config.tx_sender_limits.fee_rate_hard_cap,
219 )
220 .await
221 .wrap_err("Failed to get fee rate")?;
222
223 Ok(fee_rate.to_sat_per_vb_ceil())
225}
226
227#[async_trait]
229pub trait SyncStatusProvider: NamedEntity {
230 async fn get_sync_status<C: crate::citrea::CitreaClientT>(
231 db: &Database,
232 rpc: &ExtendedBitcoinRpc,
233 config: &crate::config::BridgeConfig,
234 citrea_client: &C,
235 ) -> Result<SyncStatus, BridgeError>;
236}
237
238#[inline(always)]
239fn log_errs_and_ok<A, T: NamedEntity>(
240 result: Result<Result<A, BridgeError>, Elapsed>,
241 action: &str,
242) -> Option<A> {
243 result
244 .inspect_err(|_| {
245 tracing::error!(
246 "[L1SyncStatus({})] Timed out while {action}",
247 T::ENTITY_NAME
248 )
249 })
250 .ok()
251 .transpose()
252 .inspect_err(|e| {
253 tracing::error!("[L1SyncStatus({})] Error {action}: {:?}", T::ENTITY_NAME, e)
254 })
255 .ok()
256 .flatten()
257}
258
259#[async_trait]
260impl<T: NamedEntity> SyncStatusProvider for T {
261 async fn get_sync_status<C: crate::citrea::CitreaClientT>(
262 db: &Database,
263 rpc: &ExtendedBitcoinRpc,
264 config: &crate::config::BridgeConfig,
265 citrea_client: &C,
266 ) -> Result<SyncStatus, BridgeError> {
267 let wallet_balance = log_errs_and_ok::<_, T>(
268 timed_request_base(
269 L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
270 "get_wallet_balance",
271 get_wallet_balance(rpc),
272 )
273 .await,
274 "getting wallet balance",
275 );
276
277 let rpc_tip_height = log_errs_and_ok::<_, T>(
278 timed_request_base(
279 L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
280 "get_rpc_tip_height",
281 get_rpc_tip_height(rpc),
282 )
283 .await,
284 "getting rpc tip height",
285 );
286
287 #[cfg(feature = "automation")]
288 let finalized_synced_height = log_errs_and_ok::<_, T>(
289 timed_request_base(
290 L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
291 "get_finalized_synced_height",
292 get_btc_syncer_consumer_last_processed_finalized_block_height(
293 db,
294 T::FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION,
295 config.protocol_paramset.finality_depth,
296 ),
297 )
298 .await,
299 "getting finalized synced height",
300 )
301 .flatten();
302
303 #[cfg(not(feature = "automation"))]
304 let finalized_synced_height = None;
305
306 let lcp_synced_height = log_errs_and_ok::<_, T>(
307 timed_request_base(
308 L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
309 "get_lcp_synced_height",
310 get_btc_syncer_consumer_last_processed_finalized_block_height(
311 db,
312 T::LCP_SYNCER_CONSUMER_ID,
313 config.protocol_paramset.finality_depth,
314 ),
315 )
316 .await,
317 "getting lcp synced height",
318 )
319 .flatten();
320
321 let btc_syncer_synced_height = log_errs_and_ok::<_, T>(
322 timed_request_base(
323 L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
324 "get_btc_syncer_synced_height",
325 get_btc_syncer_synced_height(db),
326 )
327 .await,
328 "getting btc syncer synced height",
329 )
330 .flatten();
331
332 let hcp_last_proven_height = log_errs_and_ok::<_, T>(
333 timed_request_base(
334 L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
335 "get_hcp_last_proven_height",
336 get_hcp_last_proven_height(db),
337 )
338 .await,
339 "getting hcp last proven height",
340 )
341 .flatten();
342
343 let tx_sender_synced_height = log_errs_and_ok::<_, T>(
344 timed_request_base(
345 L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
346 "get_tx_sender_synced_height",
347 get_tx_sender_synced_height(db),
348 )
349 .await,
350 "getting tx sender synced height",
351 )
352 .flatten();
353
354 let state_manager_next_height = log_errs_and_ok::<_, T>(
355 timed_request_base(
356 L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
357 "get_state_manager_next_height",
358 get_state_manager_next_height(db, T::ENTITY_NAME),
359 )
360 .await,
361 "getting state manager next height",
362 )
363 .flatten();
364
365 let bitcoin_fee_rate_sat_vb = log_errs_and_ok::<_, T>(
366 timed_request_base(
367 L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
368 "get_bitcoin_fee_rate",
369 get_bitcoin_fee_rate(rpc, config),
370 )
371 .await,
372 "getting bitcoin fee rate",
373 );
374
375 let citrea_l2_block_height = log_errs_and_ok::<_, T>(
376 timed_request_base(
377 L1_SYNC_STATUS_SUB_REQUEST_METRICS_TIMEOUT,
378 "get_citrea_l2_block_height",
379 citrea_client.get_current_l2_block_height(),
380 )
381 .await,
382 "getting citrea L2 block height",
383 );
384
385 Ok(SyncStatus {
386 wallet_balance,
387 rpc_tip_height,
388 btc_syncer_synced_height,
389 hcp_last_proven_height,
390 tx_sender_synced_height,
391 finalized_synced_height,
392 state_manager_next_height,
393 bitcoin_fee_rate_sat_vb,
394 lcp_synced_height,
395 citrea_l2_block_height,
396 })
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use bitcoincore_rpc::RpcApi;
403
404 #[cfg(not(feature = "automation"))]
405 use crate::rpc::clementine::EntityType;
406 use crate::{
407 rpc::clementine::{Empty, GetEntityStatusesRequest},
408 test::common::{
409 citrea::MockCitreaClient, create_actors, create_regtest_rpc,
410 create_test_config_with_thread_name,
411 },
412 };
413 use std::time::Duration;
414
415 #[tokio::test]
416 async fn test_get_sync_status_should_not_fail() {
417 let mut config = create_test_config_with_thread_name().await;
418 let regtest = create_regtest_rpc(&mut config).await;
419 config.bitcoin_rpc_url += "/wallet/test-wallet";
420 regtest.rpc().unload_wallet("admin".into()).await.unwrap();
422
423 regtest
425 .rpc()
426 .create_wallet("test-wallet", None, None, None, None)
427 .await
428 .unwrap();
429
430 let addr = regtest.rpc().get_new_address(None, None).await.unwrap();
431 regtest
432 .rpc()
433 .generate_to_address(201, addr.assume_checked_ref())
434 .await
435 .unwrap();
436
437 let actors = create_actors::<MockCitreaClient>(&config).await;
438
439 regtest
441 .rpc()
442 .unload_wallet(Some("test-wallet"))
443 .await
444 .unwrap();
445
446 let res = actors
448 .get_verifier_client_by_index(0)
449 .get_current_status(Empty {})
450 .await;
451
452 assert!(res.is_ok(), "Expected Ok(_) but got {res:?}");
454
455 assert_eq!(res.unwrap().into_inner().wallet_balance, None);
457 }
458
459 #[tokio::test]
460 async fn test_get_sync_status() {
461 let mut config = create_test_config_with_thread_name().await;
462 let _regtest = create_regtest_rpc(&mut config).await;
463 let actors = create_actors::<MockCitreaClient>(&config).await;
464 let mut aggregator = actors.get_aggregator();
465 tokio::time::sleep(Duration::from_secs(40)).await;
467 let entity_statuses = aggregator
468 .get_entity_statuses(tonic::Request::new(GetEntityStatusesRequest {
469 restart_tasks: false,
470 }))
471 .await
472 .unwrap()
473 .into_inner();
474
475 for entity in entity_statuses.entity_statuses {
476 let status = entity.status_result.unwrap();
477 match status {
478 crate::rpc::clementine::entity_status_with_id::StatusResult::Status(status) => {
479 tracing::info!("Status: {:#?}", status);
480 #[cfg(feature = "automation")]
481 {
482 assert!(status.automation);
483 assert!(
484 status
485 .tx_sender_synced_height
486 .expect("tx_sender_synced_height is None")
487 > 0
488 );
489 assert!(
490 status
491 .finalized_synced_height
492 .expect("finalized_synced_height is None")
493 > 0
494 );
495 assert!(
496 status
497 .hcp_last_proven_height
498 .expect("hcp_last_proven_height is None")
499 > 0
500 );
501 assert!(status.rpc_tip_height.expect("rpc_tip_height is None") > 0);
502 assert!(
503 status
504 .bitcoin_syncer_synced_height
505 .expect("bitcoin_syncer_synced_height is None")
506 > 0
507 );
508 assert!(
509 status
510 .state_manager_next_height
511 .expect("state_manager_next_height is None")
512 > 0
513 );
514 assert!(status.wallet_balance.is_some());
515 assert!(
516 status
517 .btc_fee_rate_sat_vb
518 .expect("btc_fee_rate_sat_vb is None")
519 > 0
520 );
521 assert!(status.citrea_l2_block_height.is_some());
522 }
523 #[cfg(not(feature = "automation"))]
524 {
525 let entity_type: EntityType =
526 entity.entity_id.unwrap().kind.try_into().unwrap();
527 assert!(!status.automation);
529 assert!(status.tx_sender_synced_height.is_none());
530 if entity_type == EntityType::Verifier {
531 assert!(
532 status
533 .finalized_synced_height
534 .expect("finalized_synced_height is None")
535 > 0
536 );
537 } else {
538 assert!(status.finalized_synced_height.is_none());
540 }
541 assert!(status.hcp_last_proven_height.is_none());
542 assert!(status.rpc_tip_height.expect("rpc_tip_height is None") > 0);
543 assert!(
544 status
545 .bitcoin_syncer_synced_height
546 .expect("bitcoin_syncer_synced_height is None")
547 > 0
548 );
549 assert!(status.state_manager_next_height.is_none());
550 assert!(status.wallet_balance.is_some());
551 assert!(
552 status
553 .btc_fee_rate_sat_vb
554 .expect("bitcoin_fee_rate_sat_vb is None")
555 > 0
556 );
557 assert!(status.citrea_l2_block_height.is_some());
558 }
559 }
560 crate::rpc::clementine::entity_status_with_id::StatusResult::Err(error) => {
561 let error_msg = &error.error;
562 panic!("Couldn't get entity status: {error_msg}");
563 }
564 }
565 }
566 }
567}