clementine_core/
utils.rs

1use crate::config::TelemetryConfig;
2use crate::rpc::clementine::VergenResponse;
3use clementine_errors::BridgeError;
4use eyre::Context as _;
5use futures::future::join_all;
6use http::HeaderValue;
7use metrics_exporter_prometheus::PrometheusBuilder;
8use std::fmt::{Debug, Display};
9use std::future::Future;
10use std::net::{Ipv4Addr, SocketAddr};
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll};
14use std::time::Duration;
15use tokio::time::error::Elapsed;
16use tokio::time::timeout;
17use tonic::Status;
18use tower::{Layer, Service};
19use tracing::{debug_span, Instrument};
20
21// Re-export types from clementine-utils
22pub use clementine_utils::{
23    tracing::initialize_logger, FeePayingType, Last20Bytes, NamedEntity, RbfSigningInfo,
24    RbfSigningSpendPath, ScriptBufExt, TryLast20Bytes, TxMetadata,
25};
26
27pub fn initialize_telemetry(config: &TelemetryConfig) -> Result<(), BridgeError> {
28    let telemetry_addr: SocketAddr = format!("{}:{}", config.host, config.port)
29        .parse()
30        .unwrap_or_else(|_| {
31            tracing::warn!(
32                "Invalid telemetry address: {}:{}, using default address: 127.0.0.1:8081",
33                config.host,
34                config.port
35            );
36            SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), 8081))
37        });
38
39    tracing::debug!("Initializing telemetry at {}", telemetry_addr);
40
41    let builder = PrometheusBuilder::new().with_http_listener(telemetry_addr);
42
43    builder
44        .install()
45        .map_err(|e| eyre::eyre!("Failed to initialize telemetry: {}", e))?;
46
47    Ok(())
48}
49
50pub fn get_vergen_response() -> VergenResponse {
51    let mut vergen_response = String::new();
52
53    // build info
54    if let Some(date) = option_env!("VERGEN_BUILD_DATE") {
55        vergen_response.push_str(&format!("Build Date: {date}\n"));
56    }
57    if let Some(timestamp) = option_env!("VERGEN_BUILD_TIMESTAMP") {
58        vergen_response.push_str(&format!("Build Timestamp: {timestamp}\n"));
59    }
60
61    // git info
62    if let Some(branch) = option_env!("VERGEN_GIT_BRANCH") {
63        vergen_response.push_str(&format!("git branch: {branch}\n"));
64    }
65    if let Some(commit) = option_env!("VERGEN_GIT_SHA") {
66        vergen_response.push_str(&format!("git commit: {commit}\n"));
67    }
68    if let Some(commit_date) = option_env!("VERGEN_GIT_COMMIT_DATE") {
69        vergen_response.push_str(&format!("git commit date: {commit_date}\n"));
70    }
71    if let Some(commit_timestamp) = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP") {
72        vergen_response.push_str(&format!("git commit timestamp: {commit_timestamp}\n"));
73    }
74    if let Some(commit_author_name) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_NAME") {
75        vergen_response.push_str(&format!("git commit author name: {commit_author_name}\n"));
76    }
77    if let Some(commit_author_email) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_EMAIL") {
78        vergen_response.push_str(&format!("git commit author email: {commit_author_email}\n"));
79    }
80    if let Some(commit_count) = option_env!("VERGEN_GIT_COMMIT_COUNT") {
81        vergen_response.push_str(&format!("git commit count: {commit_count}\n"));
82    }
83    if let Some(commit_message) = option_env!("VERGEN_GIT_COMMIT_MESSAGE") {
84        vergen_response.push_str(&format!("git commit message: {commit_message}\n"));
85    }
86    if let Some(describe) = option_env!("VERGEN_GIT_DESCRIBE") {
87        vergen_response.push_str(&format!("git describe: {describe}\n"));
88    }
89    if let Some(dirty) = option_env!("VERGEN_GIT_DIRTY") {
90        vergen_response.push_str(&format!("git dirty: {dirty}\n"));
91    }
92
93    // cargo info
94    if let Some(debug) = option_env!("VERGEN_CARGO_DEBUG") {
95        vergen_response.push_str(&format!("cargo debug: {debug}\n"));
96    }
97    if let Some(opt_level) = option_env!("VERGEN_CARGO_OPT_LEVEL") {
98        vergen_response.push_str(&format!("cargo opt level: {opt_level}\n"));
99    }
100    if let Some(target_triple) = option_env!("VERGEN_CARGO_TARGET_TRIPLE") {
101        vergen_response.push_str(&format!("cargo target triple: {target_triple}\n"));
102    }
103    if let Some(features) = option_env!("VERGEN_CARGO_FEATURES") {
104        vergen_response.push_str(&format!("cargo features: {features}\n"));
105    }
106    if let Some(dependencies) = option_env!("VERGEN_CARGO_DEPENDENCIES") {
107        vergen_response.push_str(&format!("cargo dependencies: {dependencies}\n"));
108    }
109
110    // rustc info
111    if let Some(channel) = option_env!("VERGEN_RUSTC_CHANNEL") {
112        vergen_response.push_str(&format!("rustc channel: {channel}\n"));
113    }
114    if let Some(version) = option_env!("VERGEN_RUSTC_SEMVER") {
115        vergen_response.push_str(&format!("rustc version: {version}\n"));
116    }
117    if let Some(commit_hash) = option_env!("VERGEN_RUSTC_COMMIT_HASH") {
118        vergen_response.push_str(&format!("rustc commit hash: {commit_hash}\n"));
119    }
120    if let Some(commit_date) = option_env!("VERGEN_RUSTC_COMMIT_DATE") {
121        vergen_response.push_str(&format!("rustc commit date: {commit_date}\n"));
122    }
123    if let Some(host_triple) = option_env!("VERGEN_RUSTC_HOST_TRIPLE") {
124        vergen_response.push_str(&format!("rustc host triple: {host_triple}\n"));
125    }
126    if let Some(llvm_version) = option_env!("VERGEN_RUSTC_LLVM_VERSION") {
127        vergen_response.push_str(&format!("rustc LLVM version: {llvm_version}\n"));
128    }
129
130    // sysinfo
131    if let Some(cpu_brand) = option_env!("VERGEN_SYSINFO_CPU_BRAND") {
132        vergen_response.push_str(&format!("cpu brand: {cpu_brand}\n"));
133    }
134    if let Some(cpu_name) = option_env!("VERGEN_SYSINFO_CPU_NAME") {
135        vergen_response.push_str(&format!("cpu name: {cpu_name}\n"));
136    }
137    if let Some(cpu_vendor) = option_env!("VERGEN_SYSINFO_CPU_VENDOR") {
138        vergen_response.push_str(&format!("cpu vendor: {cpu_vendor}\n"));
139    }
140    if let Some(cpu_core_count) = option_env!("VERGEN_SYSINFO_CPU_CORE_COUNT") {
141        vergen_response.push_str(&format!("cpu core count: {cpu_core_count}\n"));
142    }
143    if let Some(cpu_frequency) = option_env!("VERGEN_SYSINFO_CPU_FREQUENCY") {
144        vergen_response.push_str(&format!("cpu frequency: {cpu_frequency} MHz\n"));
145    }
146    if let Some(memory) = option_env!("VERGEN_SYSINFO_TOTAL_MEMORY") {
147        vergen_response.push_str(&format!("total memory: {memory}\n"));
148    }
149    if let Some(name) = option_env!("VERGEN_SYSINFO_NAME") {
150        vergen_response.push_str(&format!("system name: {name}\n"));
151    }
152    if let Some(os_version) = option_env!("VERGEN_SYSINFO_OS_VERSION") {
153        vergen_response.push_str(&format!("OS version: {os_version}\n"));
154    }
155    if let Some(user) = option_env!("VERGEN_SYSINFO_USER") {
156        vergen_response.push_str(&format!("build user: {user}\n"));
157    }
158
159    VergenResponse {
160        response: vergen_response,
161    }
162}
163
164/// Monitors a [`tokio::task::JoinHandle`] in the background and logs it's end
165/// result.
166pub fn monitor_standalone_task<
167    T: Send + 'static,
168    E: Debug + Send + 'static + From<BridgeError>,
169    C: Send + 'static,
170>(
171    task_handle: tokio::task::JoinHandle<Result<T, E>>,
172    task_name: &str,
173    monitor_err_sender: tokio::sync::mpsc::Sender<Result<C, E>>,
174) {
175    let task_name = task_name.to_string();
176
177    // Move task_handle into the spawned task to make it Send
178    tokio::spawn(async move {
179        match task_handle.await {
180            Ok(Ok(_)) => {
181                tracing::debug!("Task {} completed successfully", task_name);
182            }
183            Ok(Err(e)) => {
184                tracing::error!("Task {} threw an error: {:?}", task_name, e);
185                let _ = monitor_err_sender.send(Err(e)).await.inspect_err(|e| {
186                    tracing::error!("Failed to send error to monitoring channel: {:?}", e)
187                });
188            }
189            Err(e) => {
190                if e.is_cancelled() {
191                    // Task was cancelled, which is expected during cleanup
192                    tracing::debug!("Task {} has been cancelled", task_name);
193                    let _ = monitor_err_sender
194                        .send(Err(Into::<BridgeError>::into(eyre::eyre!(
195                            "Task was cancelled due to: {:?}",
196                            e
197                        ))
198                        .into()))
199                        .await
200                        .inspect_err(|e| {
201                            tracing::error!("Failed to send error to monitoring channel: {:?}", e)
202                        });
203                    return;
204                }
205                tracing::error!("Task {} has panicked: {:?}", task_name, e);
206                let _ = monitor_err_sender
207                    .send(Err(Into::<BridgeError>::into(eyre::eyre!(
208                        "Task has panicked due to: {:?}",
209                        e
210                    ))
211                    .into()))
212                    .await
213                    .inspect_err(|e| {
214                        tracing::error!("Failed to send error to monitoring channel: {:?}", e)
215                    });
216            }
217        }
218    });
219}
220
221/// Delays the exit of the program for 15 seconds, to allow for logs to be flushed.
222/// Then panics with the given arguments.
223///
224/// # Parameters
225///
226/// - `($($arg:tt)*)`: Arguments to pass to `panic!`, in the same manner as format! and println!
227macro_rules! delayed_panic {
228    ($($arg:tt)*) => {
229        {
230            eprintln!($($arg)*);
231            eprintln!("Delaying exit for 15 seconds, to allow for logs to be flushed");
232            std::thread::sleep(std::time::Duration::from_secs(15));
233            panic!($($arg)*);
234        }
235    };
236}
237
238pub(crate) use delayed_panic;
239
240#[derive(Debug, Clone, Default)]
241pub struct AddMethodMiddlewareLayer;
242
243impl<S> Layer<S> for AddMethodMiddlewareLayer {
244    type Service = AddMethodMiddleware<S>;
245
246    fn layer(&self, service: S) -> Self::Service {
247        AddMethodMiddleware { inner: service }
248    }
249}
250
251#[derive(Debug, Clone)]
252pub struct AddMethodMiddleware<S> {
253    inner: S,
254}
255
256type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
257
258impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for AddMethodMiddleware<S>
259where
260    S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>> + Clone + Send + 'static,
261    S::Future: Send + 'static,
262    ReqBody: Send + 'static,
263{
264    type Response = S::Response;
265    type Error = S::Error;
266    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
267
268    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
269        self.inner.poll_ready(cx)
270    }
271
272    fn call(&mut self, mut req: http::Request<ReqBody>) -> Self::Future {
273        // See: https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
274        let clone = self.inner.clone();
275        let mut inner = std::mem::replace(&mut self.inner, clone);
276
277        Box::pin(async move {
278            let path = req.uri().path();
279
280            let grpc_method =
281                if let &[_, _, method] = &path.split("/").collect::<Vec<&str>>().as_slice() {
282                    Some(method.to_string())
283                } else {
284                    None
285                };
286
287            if let Some(grpc_method) = grpc_method {
288                if let Ok(grpc_method) = HeaderValue::from_str(&grpc_method) {
289                    req.headers_mut().insert("grpc-method", grpc_method);
290                }
291            }
292
293            // Do extra async work here...
294            let response = inner.call(req).await?;
295
296            Ok(response)
297        })
298    }
299}
300
301// NOTE: NamedEntity, TxMetadata, FeePayingType, RbfSigningInfo,
302// Last20Bytes, TryLast20Bytes, ScriptBufExt are now re-exported
303// from clementine-utils at the top of this file.
304
305/// Wraps a future with a timeout, returning a `Status::deadline_exceeded` gRPC error
306/// if the future does not complete within the specified duration.
307///
308/// This is useful for enforcing timeouts on individual asynchronous operations,
309/// especially those involving network requests, to prevent them from hanging indefinitely.
310///
311/// # Arguments
312///
313/// * `duration`: The maximum `Duration` to wait for the future to complete.
314/// * `description`: A string slice describing the operation, used in the timeout error message.
315/// * `future`: The `Future` to execute. The future should return a `Result<T, BridgeError>`.
316///
317/// # Returns
318///
319/// Returns `Ok(T)` if the future completes successfully within the time limit.
320/// Returns `Err(BridgeError)` if the future returns an error or if it times out.
321/// A timeout results in a `BridgeError` that wraps a `tonic::Status::deadline_exceeded`.
322pub async fn timed_request<F, T>(
323    duration: Duration,
324    description: &str,
325    future: F,
326) -> Result<T, BridgeError>
327where
328    F: Future<Output = Result<T, BridgeError>>,
329{
330    timed_request_base(duration, description, future)
331        .await
332        .map_err(|_| {
333            Box::new(Status::deadline_exceeded(format!(
334                "{description} timed out"
335            )))
336        })?
337}
338
339/// Wraps a future with a timeout and adds a debug span with the description.
340///
341/// # Arguments
342///
343/// * `duration`: The maximum `Duration` to wait for the future to complete.
344/// * `description`: A string slice describing the operation, used in the timeout error message.
345/// * `future`: The `Future` to execute. The future should return a `Result<T, BridgeError>`.
346///
347/// # Returns
348///
349/// Returns `Ok(Ok(T))` if the future completes successfully within the time limit, returns `Ok(Err(e))`
350/// if the future returns an error, returns `Err(Elapsed)` if the request times out.
351pub async fn timed_request_base<F, T>(
352    duration: Duration,
353    description: &str,
354    future: F,
355) -> Result<Result<T, BridgeError>, Elapsed>
356where
357    F: Future<Output = Result<T, BridgeError>>,
358{
359    timeout(duration, future)
360        .instrument(debug_span!("timed_request", description = description))
361        .await
362}
363
364/// Concurrently executes a collection of futures, applying a timeout to each one individually.
365/// If any future fails or times out, the entire operation is aborted and an error is returned.
366///
367/// This utility is an extension of `futures::future::try_join_all` with added per-future
368/// timeout logic and improved error reporting using optional IDs.
369///
370/// # Type Parameters
371///
372/// * `I`: An iterator that yields futures.
373/// * `T`: The success type of the futures.
374/// * `D`: A type that can be displayed, used for identifying futures in error messages.
375///
376/// # Arguments
377///
378/// * `duration`: The timeout `Duration` applied to each individual future in the iterator.
379/// * `description`: A string slice describing the collective operation, used in timeout error messages.
380/// * `ids`: An optional `Vec<D>` of identifiers corresponding to each future. If provided,
381///   these IDs are used in error messages to specify which future failed or timed out.
382/// * `iter`: An iterator producing the futures to be executed.
383///
384/// # Returns
385///
386/// Returns `Ok(Vec<T>)` containing the results of all futures if they all complete successfully.
387/// Returns `Err(BridgeError)` if any future returns an error or times out. The error will be a combined error of all errors.
388/// The error will be contextualized with the operation description and the specific future's ID if available.
389pub async fn timed_try_join_all<I, T, D>(
390    duration: Duration,
391    description: &str,
392    ids: Option<Vec<D>>,
393    iter: I,
394) -> Result<Vec<T>, BridgeError>
395where
396    D: Display,
397    I: IntoIterator,
398    I::Item: Future<Output = Result<T, BridgeError>>,
399{
400    let ids = Arc::new(ids);
401    let results = join_all(iter.into_iter().enumerate().map(|item| {
402        let ids = ids.clone();
403        async move {
404            let id = Option::as_ref(&ids).and_then(|ids| ids.get(item.0));
405
406            timeout(duration, item.1)
407                .await
408                .map_err(|_| {
409                    Box::new(Status::deadline_exceeded(format!(
410                        "{} (id: {}) timed out",
411                        description,
412                        id.map(|id| id.to_string())
413                            .unwrap_or_else(|| "n/a".to_string())
414                    )))
415                })?
416                // Add the id to the error chain for easier debugging for other errors.
417                .wrap_err_with(|| {
418                    format!(
419                        "Failed to join {}",
420                        id.map(ToString::to_string).unwrap_or_else(|| "n/a".into())
421                    )
422                })
423        }
424    }))
425    .instrument(debug_span!("timed_try_join_all", description = description))
426    .await;
427
428    combine_errors(results)
429}
430
431/// Executes a collection of fallible futures concurrently and aggregates failures into a single
432/// [`BridgeError`].
433///
434/// This runs all futures like [`futures::future::try_join_all`], but instead of stopping on the
435/// first error it waits for every future to finish by leveraging [`join_all`]. All errors are then
436/// combined via [`combine_errors`], ensuring the caller gets full visibility into every failure.
437pub async fn try_join_all_combine_errors<F, T, E>(
438    futures: impl IntoIterator<Item = F>,
439) -> Result<Vec<T>, BridgeError>
440where
441    F: Future<Output = Result<T, E>>,
442    E: std::fmt::Display,
443{
444    let results = join_all(futures).await;
445    combine_errors(results)
446}
447
448/// Executes a collection of fallible futures concurrently and partitions results into successes and errors.
449///
450/// Unlike [`try_join_all_combine_errors`], this function does not fail on errors. Instead, it runs all
451/// futures concurrently using [`futures::future::join_all`] and partitions the results:
452/// - Successful results are collected into a `Vec<T>`
453/// - Errors are collected and combined into an optional error string
454///
455/// This is useful when you want to collect partial successes even if some futures fail, allowing
456/// the caller to decide how to handle the mixed results.
457///
458/// # Returns
459///
460/// Returns a tuple `(Vec<T>, Option<String>)` where:
461/// - The first element contains all successful results
462/// - The second element is `Some(combined_error_string)` if any errors occurred, or `None` if all succeeded
463pub async fn join_all_partition_results<F, T, E>(
464    futures: impl IntoIterator<Item = F>,
465) -> (Vec<T>, Option<String>)
466where
467    F: Future<Output = Result<T, E>>,
468    E: std::fmt::Display,
469{
470    let results = join_all(futures).await;
471    let mut errors = Vec::new();
472    let mut successful_results = Vec::new();
473    for result in results {
474        match result {
475            Ok(value) => successful_results.push(value),
476            Err(e) => errors.push(format!("{e:#}")),
477        }
478    }
479
480    (
481        successful_results,
482        match errors.is_empty() {
483            true => None,
484            false => Some(format!(
485                "Number of failed futures: {}: {}",
486                errors.len(),
487                errors.join("; ")
488            )),
489        },
490    )
491}
492
493/// Collects errors from an iterator of results and returns a combined error if any failed.
494///
495/// # Parameters
496/// * `results`: Iterator of results (errors should contain identifying information in their Debug representation)
497/// * `prefix`: Prefix message for the combined error (e.g., "Operator key collection failures")
498///
499/// # Returns
500/// * `Ok(Vec<T>)` containing all successful results if all results are successful
501/// * `Err(BridgeError)` with a combined error message listing all failures
502pub fn combine_errors<I, EIn, T>(results: I) -> Result<Vec<T>, BridgeError>
503where
504    I: IntoIterator<Item = Result<T, EIn>>,
505    EIn: std::fmt::Display,
506{
507    let mut errors = Vec::new();
508    let mut successful_results = Vec::new();
509    for result in results {
510        match result {
511            Ok(value) => successful_results.push(value),
512            Err(e) => errors.push(format!("{e:#}")),
513        }
514    }
515    if !errors.is_empty() {
516        return Err(BridgeError::from(eyre::eyre!(
517            "Number of failed futures: {}: {}",
518            errors.len(),
519            errors.join("; ")
520        )));
521    }
522    Ok(successful_results)
523}
524
525/// Collects all errors (both outer and inner) from named task results and returns a combined error if any task failed.
526///
527/// This function is useful when you have multiple async tasks (e.g., from `tokio::spawn`) and want to
528/// see all errors if multiple tasks fail, rather than just the first error.
529///
530/// # Parameters
531/// * `task_results`: Iterator of tuples containing (task_name, Result<Result<T, E1>, E2>)
532///   - `task_name`: A string-like identifier for the task (used in error messages)
533///   - The nested Result represents: `Result<T, E1>` is the task's result, `E2` is typically a `JoinError`
534///
535/// # Returns
536/// * `Ok(())` if all tasks completed successfully
537/// * `Err(BridgeError)` with a combined error message listing all failures
538pub fn flatten_join_named_results<T, E1, E2, S, R>(task_results: R) -> Result<(), BridgeError>
539where
540    R: IntoIterator<Item = (S, Result<Result<T, E1>, E2>)>,
541    S: AsRef<str>,
542    E1: std::fmt::Display,
543    E2: std::fmt::Display,
544{
545    let mut task_errors = Vec::new();
546
547    for (task_name, task_output) in task_results.into_iter() {
548        match task_output {
549            Ok(inner_result) => {
550                if let Err(e) = inner_result {
551                    let err_msg = format!("{} failed with error: {:#}", task_name.as_ref(), e);
552                    task_errors.push(err_msg);
553                }
554            }
555            Err(e) => {
556                let err_msg = format!(
557                    "{} task thread failed with error: {:#}",
558                    task_name.as_ref(),
559                    e
560                );
561                task_errors.push(err_msg);
562            }
563        }
564    }
565
566    if !task_errors.is_empty() {
567        tracing::error!("Tasks failed with errors: {:#?}", task_errors);
568        return Err(eyre::eyre!("Tasks failed with errors: {:#?}", task_errors).into());
569    }
570
571    Ok(())
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577    use std::fs;
578    use std::io::Read;
579    use tempfile::NamedTempFile;
580    use tracing::level_filters::LevelFilter;
581
582    #[test]
583    #[ignore = "This test changes environment variables so it should not be run in CI since it might affect other tests."]
584    fn test_ci_logging_setup() {
585        let temp_file = NamedTempFile::new().expect("Failed to create temp file");
586        let temp_path = temp_file.path().to_string_lossy().to_string();
587
588        std::env::set_var("CI", "true");
589        std::env::set_var("INFO_LOG_FILE", &temp_path);
590
591        let result = initialize_logger(Some(LevelFilter::DEBUG));
592        assert!(result.is_ok(), "Logger initialization should succeed");
593
594        tracing::error!("Test error message");
595        tracing::warn!("Test warn message");
596        tracing::info!("Test info message");
597        tracing::debug!(target: "ci", "Test CI debug message");
598        tracing::debug!("Test debug message");
599
600        std::thread::sleep(std::time::Duration::from_millis(100));
601
602        let mut file_contents = String::new();
603        let mut file = fs::File::open(&temp_path).expect("Failed to open log file");
604        file.read_to_string(&mut file_contents)
605            .expect("Failed to read log file");
606
607        assert!(
608            file_contents.contains("Test error message"),
609            "Error message should be in file"
610        );
611        assert!(
612            file_contents.contains("Test warn message"),
613            "Warn message should be in file"
614        );
615        assert!(
616            file_contents.contains("Test info message"),
617            "Info message should be in file"
618        );
619
620        assert!(
621            file_contents.contains("Test CI debug message"),
622            "Debug message for CI should be in file"
623        );
624
625        assert!(
626            !file_contents.contains("Test debug message"),
627            "Debug message should not be in file"
628        );
629
630        std::env::remove_var("CI");
631        std::env::remove_var("INFO_LOG_FILE");
632    }
633}