1use crate::config::TelemetryConfig;
2use crate::rpc::clementine::VergenResponse;
3use clementine_errors::BridgeError;
4use eyre::Context as _;
5use futures::future::join_all;
6use http::HeaderValue;
7use metrics_exporter_prometheus::PrometheusBuilder;
8use std::fmt::{Debug, Display};
9use std::fs::File;
10use std::future::Future;
11use std::net::{Ipv4Addr, SocketAddr};
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use std::time::Duration;
16use tokio::time::error::Elapsed;
17use tokio::time::timeout;
18use tonic::Status;
19use tower::{Layer, Service};
20use tracing::level_filters::LevelFilter;
21use tracing::{debug_span, Instrument, Subscriber};
22use tracing_subscriber::layer::SubscriberExt;
23use tracing_subscriber::{fmt, EnvFilter, Layer as TracingLayer, Registry};
24
25pub use clementine_utils::{
27 FeePayingType, Last20Bytes, NamedEntity, RbfSigningInfo, ScriptBufExt, TryLast20Bytes,
28 TxMetadata,
29};
30
31pub fn initialize_logger(default_level: Option<LevelFilter>) -> Result<(), BridgeError> {
64 let is_ci = std::env::var("CI")
65 .map(|v| v == "true" || v == "1")
66 .unwrap_or(false);
67
68 if cfg!(test) {
72 std::env::set_var("RUST_LIB_BACKTRACE", "full");
74 std::env::set_var("RUST_BACKTRACE", "full");
75 }
76
77 let _ = color_eyre::config::HookBuilder::default()
79 .add_frame_filter(Box::new(|frames| {
80 let filters = &[
82 "std::",
83 "test::",
84 "tokio::",
85 "core::",
86 "<core::",
87 "<alloc::",
88 "start_thread",
89 "<tonic::",
90 "<futures::",
91 "<tower::",
92 "<hyper",
93 "hyper",
94 "__rust_try",
95 "<axum::",
96 "<F as ",
97 "clone",
98 ];
99
100 frames.retain(|frame| {
101 !filters.iter().any(|f| {
102 let name = if let Some(name) = frame.name.as_ref() {
103 name.as_str()
104 } else {
105 return true;
106 };
107
108 name.starts_with(f)
109 })
110 });
111 }))
112 .install();
113
114 if is_ci {
115 let info_log_file = std::env::var("INFO_LOG_FILE").ok();
116 if let Some(file_path) = info_log_file {
117 try_set_global_subscriber(env_subscriber_with_file(&file_path)?);
118 tracing::trace!("Using file logging in CI, outputting to {}", file_path);
119 } else {
120 try_set_global_subscriber(env_subscriber_to_human(default_level));
121 tracing::trace!("Using console logging in CI");
122 tracing::warn!(
123 "CI is set but INFO_LOG_FILE is missing, only console logs will be used."
124 );
125 }
126 } else if is_json_logs() {
127 try_set_global_subscriber(env_subscriber_to_json(default_level));
128 tracing::trace!("Using JSON logging");
129 } else {
130 try_set_global_subscriber(env_subscriber_to_human(default_level));
131 tracing::trace!("Using human-readable logging");
132 }
133
134 tracing::info!("Tracing initialized successfully.");
135 Ok(())
136}
137
138pub fn initialize_telemetry(config: &TelemetryConfig) -> Result<(), BridgeError> {
139 let telemetry_addr: SocketAddr = format!("{}:{}", config.host, config.port)
140 .parse()
141 .unwrap_or_else(|_| {
142 tracing::warn!(
143 "Invalid telemetry address: {}:{}, using default address: 127.0.0.1:8081",
144 config.host,
145 config.port
146 );
147 SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), 8081))
148 });
149
150 tracing::debug!("Initializing telemetry at {}", telemetry_addr);
151
152 let builder = PrometheusBuilder::new().with_http_listener(telemetry_addr);
153
154 builder
155 .install()
156 .map_err(|e| eyre::eyre!("Failed to initialize telemetry: {}", e))?;
157
158 Ok(())
159}
160
161fn try_set_global_subscriber<S>(subscriber: S)
162where
163 S: Subscriber + Send + Sync + 'static,
164{
165 match tracing::subscriber::set_global_default(subscriber) {
166 Ok(_) => {}
167 Err(_) => {
169 #[cfg(test)]
170 tracing::trace!("Tracing is already initialized, skipping without errors...");
171 #[cfg(not(test))]
172 tracing::info!(
173 "Unexpected double initialization of tracing, skipping without errors..."
174 );
175 }
176 }
177}
178
179fn env_subscriber_with_file(path: &str) -> Result<Box<dyn Subscriber + Send + Sync>, BridgeError> {
180 if let Some(parent_dir) = std::path::Path::new(path).parent() {
181 std::fs::create_dir_all(parent_dir).map_err(|e| {
182 BridgeError::ConfigError(format!(
183 "Failed to create log directory '{}': {}",
184 parent_dir.display(),
185 e
186 ))
187 })?;
188 }
189
190 let file = File::create(path).map_err(|e| BridgeError::ConfigError(e.to_string()))?;
191
192 let file_filter = EnvFilter::from_default_env()
193 .add_directive("info".parse().expect("It should parse info level"))
194 .add_directive("ci=debug".parse().expect("It should parse ci debug level"));
195
196 let console_filter = EnvFilter::builder()
197 .with_default_directive(LevelFilter::WARN.into())
198 .from_env_lossy();
199
200 let file_layer = fmt::layer()
201 .with_writer(file)
202 .with_ansi(false)
203 .with_file(true)
204 .with_line_number(true)
205 .with_target(true)
206 .with_thread_ids(true)
207 .with_thread_names(true)
208 .with_filter(file_filter)
209 .boxed();
210
211 let console_layer = fmt::layer()
212 .with_test_writer()
213 .with_file(true)
214 .with_line_number(true)
215 .with_target(true)
216 .with_filter(console_filter)
217 .boxed();
218
219 Ok(Box::new(
220 Registry::default().with(file_layer).with(console_layer),
221 ))
222}
223
224fn env_subscriber_to_json(level: Option<LevelFilter>) -> Box<dyn Subscriber + Send + Sync> {
225 let filter = match level {
226 Some(lvl) => EnvFilter::builder()
227 .with_default_directive(lvl.into())
228 .from_env_lossy(),
229 None => EnvFilter::from_default_env(),
230 };
231
232 let json_layer = fmt::layer::<Registry>()
233 .with_test_writer()
234 .with_file(true)
236 .with_line_number(true)
237 .with_thread_ids(true)
238 .with_thread_names(true)
239 .with_target(true)
240 .json();
241 Box::new(tracing_subscriber::registry().with(json_layer).with(filter))
247}
248
249fn env_subscriber_to_human(level: Option<LevelFilter>) -> Box<dyn Subscriber + Send + Sync> {
250 let filter = match level {
251 Some(lvl) => EnvFilter::builder()
252 .with_default_directive(lvl.into())
253 .from_env_lossy(),
254 None => EnvFilter::from_default_env(),
255 };
256
257 let standard_layer = fmt::layer()
258 .with_test_writer()
259 .with_file(true)
261 .with_line_number(true)
262 .with_target(true);
265
266 Box::new(
267 tracing_subscriber::registry()
268 .with(standard_layer)
269 .with(filter),
270 )
271}
272
273fn is_json_logs() -> bool {
274 std::env::var("LOG_FORMAT")
275 .map(|v| v.eq_ignore_ascii_case("json"))
276 .unwrap_or(false)
277}
278
279pub fn get_vergen_response() -> VergenResponse {
280 let mut vergen_response = String::new();
281
282 if let Some(date) = option_env!("VERGEN_BUILD_DATE") {
284 vergen_response.push_str(&format!("Build Date: {date}\n"));
285 }
286 if let Some(timestamp) = option_env!("VERGEN_BUILD_TIMESTAMP") {
287 vergen_response.push_str(&format!("Build Timestamp: {timestamp}\n"));
288 }
289
290 if let Some(branch) = option_env!("VERGEN_GIT_BRANCH") {
292 vergen_response.push_str(&format!("git branch: {branch}\n"));
293 }
294 if let Some(commit) = option_env!("VERGEN_GIT_SHA") {
295 vergen_response.push_str(&format!("git commit: {commit}\n"));
296 }
297 if let Some(commit_date) = option_env!("VERGEN_GIT_COMMIT_DATE") {
298 vergen_response.push_str(&format!("git commit date: {commit_date}\n"));
299 }
300 if let Some(commit_timestamp) = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP") {
301 vergen_response.push_str(&format!("git commit timestamp: {commit_timestamp}\n"));
302 }
303 if let Some(commit_author_name) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_NAME") {
304 vergen_response.push_str(&format!("git commit author name: {commit_author_name}\n"));
305 }
306 if let Some(commit_author_email) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_EMAIL") {
307 vergen_response.push_str(&format!("git commit author email: {commit_author_email}\n"));
308 }
309 if let Some(commit_count) = option_env!("VERGEN_GIT_COMMIT_COUNT") {
310 vergen_response.push_str(&format!("git commit count: {commit_count}\n"));
311 }
312 if let Some(commit_message) = option_env!("VERGEN_GIT_COMMIT_MESSAGE") {
313 vergen_response.push_str(&format!("git commit message: {commit_message}\n"));
314 }
315 if let Some(describe) = option_env!("VERGEN_GIT_DESCRIBE") {
316 vergen_response.push_str(&format!("git describe: {describe}\n"));
317 }
318 if let Some(dirty) = option_env!("VERGEN_GIT_DIRTY") {
319 vergen_response.push_str(&format!("git dirty: {dirty}\n"));
320 }
321
322 if let Some(debug) = option_env!("VERGEN_CARGO_DEBUG") {
324 vergen_response.push_str(&format!("cargo debug: {debug}\n"));
325 }
326 if let Some(opt_level) = option_env!("VERGEN_CARGO_OPT_LEVEL") {
327 vergen_response.push_str(&format!("cargo opt level: {opt_level}\n"));
328 }
329 if let Some(target_triple) = option_env!("VERGEN_CARGO_TARGET_TRIPLE") {
330 vergen_response.push_str(&format!("cargo target triple: {target_triple}\n"));
331 }
332 if let Some(features) = option_env!("VERGEN_CARGO_FEATURES") {
333 vergen_response.push_str(&format!("cargo features: {features}\n"));
334 }
335 if let Some(dependencies) = option_env!("VERGEN_CARGO_DEPENDENCIES") {
336 vergen_response.push_str(&format!("cargo dependencies: {dependencies}\n"));
337 }
338
339 if let Some(channel) = option_env!("VERGEN_RUSTC_CHANNEL") {
341 vergen_response.push_str(&format!("rustc channel: {channel}\n"));
342 }
343 if let Some(version) = option_env!("VERGEN_RUSTC_SEMVER") {
344 vergen_response.push_str(&format!("rustc version: {version}\n"));
345 }
346 if let Some(commit_hash) = option_env!("VERGEN_RUSTC_COMMIT_HASH") {
347 vergen_response.push_str(&format!("rustc commit hash: {commit_hash}\n"));
348 }
349 if let Some(commit_date) = option_env!("VERGEN_RUSTC_COMMIT_DATE") {
350 vergen_response.push_str(&format!("rustc commit date: {commit_date}\n"));
351 }
352 if let Some(host_triple) = option_env!("VERGEN_RUSTC_HOST_TRIPLE") {
353 vergen_response.push_str(&format!("rustc host triple: {host_triple}\n"));
354 }
355 if let Some(llvm_version) = option_env!("VERGEN_RUSTC_LLVM_VERSION") {
356 vergen_response.push_str(&format!("rustc LLVM version: {llvm_version}\n"));
357 }
358
359 if let Some(cpu_brand) = option_env!("VERGEN_SYSINFO_CPU_BRAND") {
361 vergen_response.push_str(&format!("cpu brand: {cpu_brand}\n"));
362 }
363 if let Some(cpu_name) = option_env!("VERGEN_SYSINFO_CPU_NAME") {
364 vergen_response.push_str(&format!("cpu name: {cpu_name}\n"));
365 }
366 if let Some(cpu_vendor) = option_env!("VERGEN_SYSINFO_CPU_VENDOR") {
367 vergen_response.push_str(&format!("cpu vendor: {cpu_vendor}\n"));
368 }
369 if let Some(cpu_core_count) = option_env!("VERGEN_SYSINFO_CPU_CORE_COUNT") {
370 vergen_response.push_str(&format!("cpu core count: {cpu_core_count}\n"));
371 }
372 if let Some(cpu_frequency) = option_env!("VERGEN_SYSINFO_CPU_FREQUENCY") {
373 vergen_response.push_str(&format!("cpu frequency: {cpu_frequency} MHz\n"));
374 }
375 if let Some(memory) = option_env!("VERGEN_SYSINFO_TOTAL_MEMORY") {
376 vergen_response.push_str(&format!("total memory: {memory}\n"));
377 }
378 if let Some(name) = option_env!("VERGEN_SYSINFO_NAME") {
379 vergen_response.push_str(&format!("system name: {name}\n"));
380 }
381 if let Some(os_version) = option_env!("VERGEN_SYSINFO_OS_VERSION") {
382 vergen_response.push_str(&format!("OS version: {os_version}\n"));
383 }
384 if let Some(user) = option_env!("VERGEN_SYSINFO_USER") {
385 vergen_response.push_str(&format!("build user: {user}\n"));
386 }
387
388 VergenResponse {
389 response: vergen_response,
390 }
391}
392
393pub fn monitor_standalone_task<
396 T: Send + 'static,
397 E: Debug + Send + 'static + From<BridgeError>,
398 C: Send + 'static,
399>(
400 task_handle: tokio::task::JoinHandle<Result<T, E>>,
401 task_name: &str,
402 monitor_err_sender: tokio::sync::mpsc::Sender<Result<C, E>>,
403) {
404 let task_name = task_name.to_string();
405
406 tokio::spawn(async move {
408 match task_handle.await {
409 Ok(Ok(_)) => {
410 tracing::debug!("Task {} completed successfully", task_name);
411 }
412 Ok(Err(e)) => {
413 tracing::error!("Task {} threw an error: {:?}", task_name, e);
414 let _ = monitor_err_sender.send(Err(e)).await.inspect_err(|e| {
415 tracing::error!("Failed to send error to monitoring channel: {:?}", e)
416 });
417 }
418 Err(e) => {
419 if e.is_cancelled() {
420 tracing::debug!("Task {} has been cancelled", task_name);
422 let _ = monitor_err_sender
423 .send(Err(Into::<BridgeError>::into(eyre::eyre!(
424 "Task was cancelled due to: {:?}",
425 e
426 ))
427 .into()))
428 .await
429 .inspect_err(|e| {
430 tracing::error!("Failed to send error to monitoring channel: {:?}", e)
431 });
432 return;
433 }
434 tracing::error!("Task {} has panicked: {:?}", task_name, e);
435 let _ = monitor_err_sender
436 .send(Err(Into::<BridgeError>::into(eyre::eyre!(
437 "Task has panicked due to: {:?}",
438 e
439 ))
440 .into()))
441 .await
442 .inspect_err(|e| {
443 tracing::error!("Failed to send error to monitoring channel: {:?}", e)
444 });
445 }
446 }
447 });
448}
449
450macro_rules! delayed_panic {
457 ($($arg:tt)*) => {
458 {
459 eprintln!($($arg)*);
460 eprintln!("Delaying exit for 15 seconds, to allow for logs to be flushed");
461 std::thread::sleep(std::time::Duration::from_secs(15));
462 panic!($($arg)*);
463 }
464 };
465}
466
467pub(crate) use delayed_panic;
468
469#[derive(Debug, Clone, Default)]
470pub struct AddMethodMiddlewareLayer;
471
472impl<S> Layer<S> for AddMethodMiddlewareLayer {
473 type Service = AddMethodMiddleware<S>;
474
475 fn layer(&self, service: S) -> Self::Service {
476 AddMethodMiddleware { inner: service }
477 }
478}
479
480#[derive(Debug, Clone)]
481pub struct AddMethodMiddleware<S> {
482 inner: S,
483}
484
485type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
486
487impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for AddMethodMiddleware<S>
488where
489 S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>> + Clone + Send + 'static,
490 S::Future: Send + 'static,
491 ReqBody: Send + 'static,
492{
493 type Response = S::Response;
494 type Error = S::Error;
495 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
496
497 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
498 self.inner.poll_ready(cx)
499 }
500
501 fn call(&mut self, mut req: http::Request<ReqBody>) -> Self::Future {
502 let clone = self.inner.clone();
504 let mut inner = std::mem::replace(&mut self.inner, clone);
505
506 Box::pin(async move {
507 let path = req.uri().path();
508
509 let grpc_method =
510 if let &[_, _, method] = &path.split("/").collect::<Vec<&str>>().as_slice() {
511 Some(method.to_string())
512 } else {
513 None
514 };
515
516 if let Some(grpc_method) = grpc_method {
517 if let Ok(grpc_method) = HeaderValue::from_str(&grpc_method) {
518 req.headers_mut().insert("grpc-method", grpc_method);
519 }
520 }
521
522 let response = inner.call(req).await?;
524
525 Ok(response)
526 })
527 }
528}
529
530pub async fn timed_request<F, T>(
552 duration: Duration,
553 description: &str,
554 future: F,
555) -> Result<T, BridgeError>
556where
557 F: Future<Output = Result<T, BridgeError>>,
558{
559 timed_request_base(duration, description, future)
560 .await
561 .map_err(|_| {
562 Box::new(Status::deadline_exceeded(format!(
563 "{description} timed out"
564 )))
565 })?
566}
567
568pub async fn timed_request_base<F, T>(
581 duration: Duration,
582 description: &str,
583 future: F,
584) -> Result<Result<T, BridgeError>, Elapsed>
585where
586 F: Future<Output = Result<T, BridgeError>>,
587{
588 timeout(duration, future)
589 .instrument(debug_span!("timed_request", description = description))
590 .await
591}
592
593pub async fn timed_try_join_all<I, T, D>(
619 duration: Duration,
620 description: &str,
621 ids: Option<Vec<D>>,
622 iter: I,
623) -> Result<Vec<T>, BridgeError>
624where
625 D: Display,
626 I: IntoIterator,
627 I::Item: Future<Output = Result<T, BridgeError>>,
628{
629 let ids = Arc::new(ids);
630 let results = join_all(iter.into_iter().enumerate().map(|item| {
631 let ids = ids.clone();
632 async move {
633 let id = Option::as_ref(&ids).and_then(|ids| ids.get(item.0));
634
635 timeout(duration, item.1)
636 .await
637 .map_err(|_| {
638 Box::new(Status::deadline_exceeded(format!(
639 "{} (id: {}) timed out",
640 description,
641 id.map(|id| id.to_string())
642 .unwrap_or_else(|| "n/a".to_string())
643 )))
644 })?
645 .wrap_err_with(|| {
647 format!(
648 "Failed to join {}",
649 id.map(ToString::to_string).unwrap_or_else(|| "n/a".into())
650 )
651 })
652 }
653 }))
654 .instrument(debug_span!("timed_try_join_all", description = description))
655 .await;
656
657 combine_errors(results)
658}
659
660pub async fn try_join_all_combine_errors<F, T, E>(
667 futures: impl IntoIterator<Item = F>,
668) -> Result<Vec<T>, BridgeError>
669where
670 F: Future<Output = Result<T, E>>,
671 E: std::fmt::Display,
672{
673 let results = join_all(futures).await;
674 combine_errors(results)
675}
676
677pub async fn join_all_partition_results<F, T, E>(
693 futures: impl IntoIterator<Item = F>,
694) -> (Vec<T>, Option<String>)
695where
696 F: Future<Output = Result<T, E>>,
697 E: std::fmt::Display,
698{
699 let results = join_all(futures).await;
700 let mut errors = Vec::new();
701 let mut successful_results = Vec::new();
702 for result in results {
703 match result {
704 Ok(value) => successful_results.push(value),
705 Err(e) => errors.push(format!("{e:#}")),
706 }
707 }
708
709 (
710 successful_results,
711 match errors.is_empty() {
712 true => None,
713 false => Some(format!(
714 "Number of failed futures: {}: {}",
715 errors.len(),
716 errors.join("; ")
717 )),
718 },
719 )
720}
721
722pub fn combine_errors<I, EIn, T>(results: I) -> Result<Vec<T>, BridgeError>
732where
733 I: IntoIterator<Item = Result<T, EIn>>,
734 EIn: std::fmt::Display,
735{
736 let mut errors = Vec::new();
737 let mut successful_results = Vec::new();
738 for result in results {
739 match result {
740 Ok(value) => successful_results.push(value),
741 Err(e) => errors.push(format!("{e:#}")),
742 }
743 }
744 if !errors.is_empty() {
745 return Err(BridgeError::from(eyre::eyre!(
746 "Number of failed futures: {}: {}",
747 errors.len(),
748 errors.join("; ")
749 )));
750 }
751 Ok(successful_results)
752}
753
754pub fn flatten_join_named_results<T, E1, E2, S, R>(task_results: R) -> Result<(), BridgeError>
768where
769 R: IntoIterator<Item = (S, Result<Result<T, E1>, E2>)>,
770 S: AsRef<str>,
771 E1: std::fmt::Display,
772 E2: std::fmt::Display,
773{
774 let mut task_errors = Vec::new();
775
776 for (task_name, task_output) in task_results.into_iter() {
777 match task_output {
778 Ok(inner_result) => {
779 if let Err(e) = inner_result {
780 let err_msg = format!("{} failed with error: {:#}", task_name.as_ref(), e);
781 task_errors.push(err_msg);
782 }
783 }
784 Err(e) => {
785 let err_msg = format!(
786 "{} task thread failed with error: {:#}",
787 task_name.as_ref(),
788 e
789 );
790 task_errors.push(err_msg);
791 }
792 }
793 }
794
795 if !task_errors.is_empty() {
796 tracing::error!("Tasks failed with errors: {:#?}", task_errors);
797 return Err(eyre::eyre!("Tasks failed with errors: {:#?}", task_errors).into());
798 }
799
800 Ok(())
801}
802
803#[cfg(test)]
804mod tests {
805 use super::*;
806 use std::fs;
807 use std::io::Read;
808 use tempfile::NamedTempFile;
809 use tracing::level_filters::LevelFilter;
810
811 #[test]
812 #[ignore = "This test changes environment variables so it should not be run in CI since it might affect other tests."]
813 fn test_ci_logging_setup() {
814 let temp_file = NamedTempFile::new().expect("Failed to create temp file");
815 let temp_path = temp_file.path().to_string_lossy().to_string();
816
817 std::env::set_var("CI", "true");
818 std::env::set_var("INFO_LOG_FILE", &temp_path);
819
820 let result = initialize_logger(Some(LevelFilter::DEBUG));
821 assert!(result.is_ok(), "Logger initialization should succeed");
822
823 tracing::error!("Test error message");
824 tracing::warn!("Test warn message");
825 tracing::info!("Test info message");
826 tracing::debug!(target: "ci", "Test CI debug message");
827 tracing::debug!("Test debug message");
828
829 std::thread::sleep(std::time::Duration::from_millis(100));
830
831 let mut file_contents = String::new();
832 let mut file = fs::File::open(&temp_path).expect("Failed to open log file");
833 file.read_to_string(&mut file_contents)
834 .expect("Failed to read log file");
835
836 assert!(
837 file_contents.contains("Test error message"),
838 "Error message should be in file"
839 );
840 assert!(
841 file_contents.contains("Test warn message"),
842 "Warn message should be in file"
843 );
844 assert!(
845 file_contents.contains("Test info message"),
846 "Info message should be in file"
847 );
848
849 assert!(
850 file_contents.contains("Test CI debug message"),
851 "Debug message for CI should be in file"
852 );
853
854 assert!(
855 !file_contents.contains("Test debug message"),
856 "Debug message should not be in file"
857 );
858
859 std::env::remove_var("CI");
860 std::env::remove_var("INFO_LOG_FILE");
861 }
862}