1use crate::config::TelemetryConfig;
2use crate::rpc::clementine::VergenResponse;
3use clementine_errors::BridgeError;
4use eyre::Context as _;
5use futures::future::join_all;
6use http::HeaderValue;
7use metrics_exporter_prometheus::PrometheusBuilder;
8use std::fmt::{Debug, Display};
9use std::future::Future;
10use std::net::{Ipv4Addr, SocketAddr};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::Duration;
15use tokio::time::error::Elapsed;
16use tokio::time::timeout;
17use tonic::Status;
18use tower::{Layer, Service};
19use tracing::{debug_span, Instrument};
20
21pub use clementine_utils::{
23 tracing::initialize_logger, FeePayingType, Last20Bytes, NamedEntity, RbfSigningInfo,
24 RbfSigningSpendPath, ScriptBufExt, TryLast20Bytes, TxMetadata,
25};
26
27pub fn initialize_telemetry(config: &TelemetryConfig) -> Result<(), BridgeError> {
28 let telemetry_addr: SocketAddr = format!("{}:{}", config.host, config.port)
29 .parse()
30 .unwrap_or_else(|_| {
31 tracing::warn!(
32 "Invalid telemetry address: {}:{}, using default address: 127.0.0.1:8081",
33 config.host,
34 config.port
35 );
36 SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), 8081))
37 });
38
39 tracing::debug!("Initializing telemetry at {}", telemetry_addr);
40
41 let builder = PrometheusBuilder::new().with_http_listener(telemetry_addr);
42
43 builder
44 .install()
45 .map_err(|e| eyre::eyre!("Failed to initialize telemetry: {}", e))?;
46
47 Ok(())
48}
49
50pub fn get_vergen_response() -> VergenResponse {
51 let mut vergen_response = String::new();
52
53 if let Some(date) = option_env!("VERGEN_BUILD_DATE") {
55 vergen_response.push_str(&format!("Build Date: {date}\n"));
56 }
57 if let Some(timestamp) = option_env!("VERGEN_BUILD_TIMESTAMP") {
58 vergen_response.push_str(&format!("Build Timestamp: {timestamp}\n"));
59 }
60
61 if let Some(branch) = option_env!("VERGEN_GIT_BRANCH") {
63 vergen_response.push_str(&format!("git branch: {branch}\n"));
64 }
65 if let Some(commit) = option_env!("VERGEN_GIT_SHA") {
66 vergen_response.push_str(&format!("git commit: {commit}\n"));
67 }
68 if let Some(commit_date) = option_env!("VERGEN_GIT_COMMIT_DATE") {
69 vergen_response.push_str(&format!("git commit date: {commit_date}\n"));
70 }
71 if let Some(commit_timestamp) = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP") {
72 vergen_response.push_str(&format!("git commit timestamp: {commit_timestamp}\n"));
73 }
74 if let Some(commit_author_name) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_NAME") {
75 vergen_response.push_str(&format!("git commit author name: {commit_author_name}\n"));
76 }
77 if let Some(commit_author_email) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_EMAIL") {
78 vergen_response.push_str(&format!("git commit author email: {commit_author_email}\n"));
79 }
80 if let Some(commit_count) = option_env!("VERGEN_GIT_COMMIT_COUNT") {
81 vergen_response.push_str(&format!("git commit count: {commit_count}\n"));
82 }
83 if let Some(commit_message) = option_env!("VERGEN_GIT_COMMIT_MESSAGE") {
84 vergen_response.push_str(&format!("git commit message: {commit_message}\n"));
85 }
86 if let Some(describe) = option_env!("VERGEN_GIT_DESCRIBE") {
87 vergen_response.push_str(&format!("git describe: {describe}\n"));
88 }
89 if let Some(dirty) = option_env!("VERGEN_GIT_DIRTY") {
90 vergen_response.push_str(&format!("git dirty: {dirty}\n"));
91 }
92
93 if let Some(debug) = option_env!("VERGEN_CARGO_DEBUG") {
95 vergen_response.push_str(&format!("cargo debug: {debug}\n"));
96 }
97 if let Some(opt_level) = option_env!("VERGEN_CARGO_OPT_LEVEL") {
98 vergen_response.push_str(&format!("cargo opt level: {opt_level}\n"));
99 }
100 if let Some(target_triple) = option_env!("VERGEN_CARGO_TARGET_TRIPLE") {
101 vergen_response.push_str(&format!("cargo target triple: {target_triple}\n"));
102 }
103 if let Some(features) = option_env!("VERGEN_CARGO_FEATURES") {
104 vergen_response.push_str(&format!("cargo features: {features}\n"));
105 }
106 if let Some(dependencies) = option_env!("VERGEN_CARGO_DEPENDENCIES") {
107 vergen_response.push_str(&format!("cargo dependencies: {dependencies}\n"));
108 }
109
110 if let Some(channel) = option_env!("VERGEN_RUSTC_CHANNEL") {
112 vergen_response.push_str(&format!("rustc channel: {channel}\n"));
113 }
114 if let Some(version) = option_env!("VERGEN_RUSTC_SEMVER") {
115 vergen_response.push_str(&format!("rustc version: {version}\n"));
116 }
117 if let Some(commit_hash) = option_env!("VERGEN_RUSTC_COMMIT_HASH") {
118 vergen_response.push_str(&format!("rustc commit hash: {commit_hash}\n"));
119 }
120 if let Some(commit_date) = option_env!("VERGEN_RUSTC_COMMIT_DATE") {
121 vergen_response.push_str(&format!("rustc commit date: {commit_date}\n"));
122 }
123 if let Some(host_triple) = option_env!("VERGEN_RUSTC_HOST_TRIPLE") {
124 vergen_response.push_str(&format!("rustc host triple: {host_triple}\n"));
125 }
126 if let Some(llvm_version) = option_env!("VERGEN_RUSTC_LLVM_VERSION") {
127 vergen_response.push_str(&format!("rustc LLVM version: {llvm_version}\n"));
128 }
129
130 if let Some(cpu_brand) = option_env!("VERGEN_SYSINFO_CPU_BRAND") {
132 vergen_response.push_str(&format!("cpu brand: {cpu_brand}\n"));
133 }
134 if let Some(cpu_name) = option_env!("VERGEN_SYSINFO_CPU_NAME") {
135 vergen_response.push_str(&format!("cpu name: {cpu_name}\n"));
136 }
137 if let Some(cpu_vendor) = option_env!("VERGEN_SYSINFO_CPU_VENDOR") {
138 vergen_response.push_str(&format!("cpu vendor: {cpu_vendor}\n"));
139 }
140 if let Some(cpu_core_count) = option_env!("VERGEN_SYSINFO_CPU_CORE_COUNT") {
141 vergen_response.push_str(&format!("cpu core count: {cpu_core_count}\n"));
142 }
143 if let Some(cpu_frequency) = option_env!("VERGEN_SYSINFO_CPU_FREQUENCY") {
144 vergen_response.push_str(&format!("cpu frequency: {cpu_frequency} MHz\n"));
145 }
146 if let Some(memory) = option_env!("VERGEN_SYSINFO_TOTAL_MEMORY") {
147 vergen_response.push_str(&format!("total memory: {memory}\n"));
148 }
149 if let Some(name) = option_env!("VERGEN_SYSINFO_NAME") {
150 vergen_response.push_str(&format!("system name: {name}\n"));
151 }
152 if let Some(os_version) = option_env!("VERGEN_SYSINFO_OS_VERSION") {
153 vergen_response.push_str(&format!("OS version: {os_version}\n"));
154 }
155 if let Some(user) = option_env!("VERGEN_SYSINFO_USER") {
156 vergen_response.push_str(&format!("build user: {user}\n"));
157 }
158
159 VergenResponse {
160 response: vergen_response,
161 }
162}
163
164pub fn monitor_standalone_task<
167 T: Send + 'static,
168 E: Debug + Send + 'static + From<BridgeError>,
169 C: Send + 'static,
170>(
171 task_handle: tokio::task::JoinHandle<Result<T, E>>,
172 task_name: &str,
173 monitor_err_sender: tokio::sync::mpsc::Sender<Result<C, E>>,
174) {
175 let task_name = task_name.to_string();
176
177 tokio::spawn(async move {
179 match task_handle.await {
180 Ok(Ok(_)) => {
181 tracing::debug!("Task {} completed successfully", task_name);
182 }
183 Ok(Err(e)) => {
184 tracing::error!("Task {} threw an error: {:?}", task_name, e);
185 let _ = monitor_err_sender.send(Err(e)).await.inspect_err(|e| {
186 tracing::error!("Failed to send error to monitoring channel: {:?}", e)
187 });
188 }
189 Err(e) => {
190 if e.is_cancelled() {
191 tracing::debug!("Task {} has been cancelled", task_name);
193 let _ = monitor_err_sender
194 .send(Err(Into::<BridgeError>::into(eyre::eyre!(
195 "Task was cancelled due to: {:?}",
196 e
197 ))
198 .into()))
199 .await
200 .inspect_err(|e| {
201 tracing::error!("Failed to send error to monitoring channel: {:?}", e)
202 });
203 return;
204 }
205 tracing::error!("Task {} has panicked: {:?}", task_name, e);
206 let _ = monitor_err_sender
207 .send(Err(Into::<BridgeError>::into(eyre::eyre!(
208 "Task has panicked due to: {:?}",
209 e
210 ))
211 .into()))
212 .await
213 .inspect_err(|e| {
214 tracing::error!("Failed to send error to monitoring channel: {:?}", e)
215 });
216 }
217 }
218 });
219}
220
221macro_rules! delayed_panic {
228 ($($arg:tt)*) => {
229 {
230 eprintln!($($arg)*);
231 eprintln!("Delaying exit for 15 seconds, to allow for logs to be flushed");
232 std::thread::sleep(std::time::Duration::from_secs(15));
233 panic!($($arg)*);
234 }
235 };
236}
237
238pub(crate) use delayed_panic;
239
240#[derive(Debug, Clone, Default)]
241pub struct AddMethodMiddlewareLayer;
242
243impl<S> Layer<S> for AddMethodMiddlewareLayer {
244 type Service = AddMethodMiddleware<S>;
245
246 fn layer(&self, service: S) -> Self::Service {
247 AddMethodMiddleware { inner: service }
248 }
249}
250
251#[derive(Debug, Clone)]
252pub struct AddMethodMiddleware<S> {
253 inner: S,
254}
255
256type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
257
258impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for AddMethodMiddleware<S>
259where
260 S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>> + Clone + Send + 'static,
261 S::Future: Send + 'static,
262 ReqBody: Send + 'static,
263{
264 type Response = S::Response;
265 type Error = S::Error;
266 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
267
268 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
269 self.inner.poll_ready(cx)
270 }
271
272 fn call(&mut self, mut req: http::Request<ReqBody>) -> Self::Future {
273 let clone = self.inner.clone();
275 let mut inner = std::mem::replace(&mut self.inner, clone);
276
277 Box::pin(async move {
278 let path = req.uri().path();
279
280 let grpc_method =
281 if let &[_, _, method] = &path.split("/").collect::<Vec<&str>>().as_slice() {
282 Some(method.to_string())
283 } else {
284 None
285 };
286
287 if let Some(grpc_method) = grpc_method {
288 if let Ok(grpc_method) = HeaderValue::from_str(&grpc_method) {
289 req.headers_mut().insert("grpc-method", grpc_method);
290 }
291 }
292
293 let response = inner.call(req).await?;
295
296 Ok(response)
297 })
298 }
299}
300
301pub async fn timed_request<F, T>(
323 duration: Duration,
324 description: &str,
325 future: F,
326) -> Result<T, BridgeError>
327where
328 F: Future<Output = Result<T, BridgeError>>,
329{
330 timed_request_base(duration, description, future)
331 .await
332 .map_err(|_| {
333 Box::new(Status::deadline_exceeded(format!(
334 "{description} timed out"
335 )))
336 })?
337}
338
339pub async fn timed_request_base<F, T>(
352 duration: Duration,
353 description: &str,
354 future: F,
355) -> Result<Result<T, BridgeError>, Elapsed>
356where
357 F: Future<Output = Result<T, BridgeError>>,
358{
359 timeout(duration, future)
360 .instrument(debug_span!("timed_request", description = description))
361 .await
362}
363
364pub async fn timed_try_join_all<I, T, D>(
390 duration: Duration,
391 description: &str,
392 ids: Option<Vec<D>>,
393 iter: I,
394) -> Result<Vec<T>, BridgeError>
395where
396 D: Display,
397 I: IntoIterator,
398 I::Item: Future<Output = Result<T, BridgeError>>,
399{
400 let ids = Arc::new(ids);
401 let results = join_all(iter.into_iter().enumerate().map(|item| {
402 let ids = ids.clone();
403 async move {
404 let id = Option::as_ref(&ids).and_then(|ids| ids.get(item.0));
405
406 timeout(duration, item.1)
407 .await
408 .map_err(|_| {
409 Box::new(Status::deadline_exceeded(format!(
410 "{} (id: {}) timed out",
411 description,
412 id.map(|id| id.to_string())
413 .unwrap_or_else(|| "n/a".to_string())
414 )))
415 })?
416 .wrap_err_with(|| {
418 format!(
419 "Failed to join {}",
420 id.map(ToString::to_string).unwrap_or_else(|| "n/a".into())
421 )
422 })
423 }
424 }))
425 .instrument(debug_span!("timed_try_join_all", description = description))
426 .await;
427
428 combine_errors(results)
429}
430
431pub async fn try_join_all_combine_errors<F, T, E>(
438 futures: impl IntoIterator<Item = F>,
439) -> Result<Vec<T>, BridgeError>
440where
441 F: Future<Output = Result<T, E>>,
442 E: std::fmt::Display,
443{
444 let results = join_all(futures).await;
445 combine_errors(results)
446}
447
448pub async fn join_all_partition_results<F, T, E>(
464 futures: impl IntoIterator<Item = F>,
465) -> (Vec<T>, Option<String>)
466where
467 F: Future<Output = Result<T, E>>,
468 E: std::fmt::Display,
469{
470 let results = join_all(futures).await;
471 let mut errors = Vec::new();
472 let mut successful_results = Vec::new();
473 for result in results {
474 match result {
475 Ok(value) => successful_results.push(value),
476 Err(e) => errors.push(format!("{e:#}")),
477 }
478 }
479
480 (
481 successful_results,
482 match errors.is_empty() {
483 true => None,
484 false => Some(format!(
485 "Number of failed futures: {}: {}",
486 errors.len(),
487 errors.join("; ")
488 )),
489 },
490 )
491}
492
493pub fn combine_errors<I, EIn, T>(results: I) -> Result<Vec<T>, BridgeError>
503where
504 I: IntoIterator<Item = Result<T, EIn>>,
505 EIn: std::fmt::Display,
506{
507 let mut errors = Vec::new();
508 let mut successful_results = Vec::new();
509 for result in results {
510 match result {
511 Ok(value) => successful_results.push(value),
512 Err(e) => errors.push(format!("{e:#}")),
513 }
514 }
515 if !errors.is_empty() {
516 return Err(BridgeError::from(eyre::eyre!(
517 "Number of failed futures: {}: {}",
518 errors.len(),
519 errors.join("; ")
520 )));
521 }
522 Ok(successful_results)
523}
524
525pub fn flatten_join_named_results<T, E1, E2, S, R>(task_results: R) -> Result<(), BridgeError>
539where
540 R: IntoIterator<Item = (S, Result<Result<T, E1>, E2>)>,
541 S: AsRef<str>,
542 E1: std::fmt::Display,
543 E2: std::fmt::Display,
544{
545 let mut task_errors = Vec::new();
546
547 for (task_name, task_output) in task_results.into_iter() {
548 match task_output {
549 Ok(inner_result) => {
550 if let Err(e) = inner_result {
551 let err_msg = format!("{} failed with error: {:#}", task_name.as_ref(), e);
552 task_errors.push(err_msg);
553 }
554 }
555 Err(e) => {
556 let err_msg = format!(
557 "{} task thread failed with error: {:#}",
558 task_name.as_ref(),
559 e
560 );
561 task_errors.push(err_msg);
562 }
563 }
564 }
565
566 if !task_errors.is_empty() {
567 tracing::error!("Tasks failed with errors: {:#?}", task_errors);
568 return Err(eyre::eyre!("Tasks failed with errors: {:#?}", task_errors).into());
569 }
570
571 Ok(())
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577 use std::fs;
578 use std::io::Read;
579 use tempfile::NamedTempFile;
580 use tracing::level_filters::LevelFilter;
581
582 #[test]
583 #[ignore = "This test changes environment variables so it should not be run in CI since it might affect other tests."]
584 fn test_ci_logging_setup() {
585 let temp_file = NamedTempFile::new().expect("Failed to create temp file");
586 let temp_path = temp_file.path().to_string_lossy().to_string();
587
588 std::env::set_var("CI", "true");
589 std::env::set_var("INFO_LOG_FILE", &temp_path);
590
591 let result = initialize_logger(Some(LevelFilter::DEBUG));
592 assert!(result.is_ok(), "Logger initialization should succeed");
593
594 tracing::error!("Test error message");
595 tracing::warn!("Test warn message");
596 tracing::info!("Test info message");
597 tracing::debug!(target: "ci", "Test CI debug message");
598 tracing::debug!("Test debug message");
599
600 std::thread::sleep(std::time::Duration::from_millis(100));
601
602 let mut file_contents = String::new();
603 let mut file = fs::File::open(&temp_path).expect("Failed to open log file");
604 file.read_to_string(&mut file_contents)
605 .expect("Failed to read log file");
606
607 assert!(
608 file_contents.contains("Test error message"),
609 "Error message should be in file"
610 );
611 assert!(
612 file_contents.contains("Test warn message"),
613 "Warn message should be in file"
614 );
615 assert!(
616 file_contents.contains("Test info message"),
617 "Info message should be in file"
618 );
619
620 assert!(
621 file_contents.contains("Test CI debug message"),
622 "Debug message for CI should be in file"
623 );
624
625 assert!(
626 !file_contents.contains("Test debug message"),
627 "Debug message should not be in file"
628 );
629
630 std::env::remove_var("CI");
631 std::env::remove_var("INFO_LOG_FILE");
632 }
633}