clementine_core/
utils.rs

1use crate::config::TelemetryConfig;
2use crate::rpc::clementine::VergenResponse;
3use clementine_errors::BridgeError;
4use eyre::Context as _;
5use futures::future::join_all;
6use http::HeaderValue;
7use metrics_exporter_prometheus::PrometheusBuilder;
8use std::fmt::{Debug, Display};
9use std::fs::File;
10use std::future::Future;
11use std::net::{Ipv4Addr, SocketAddr};
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15use std::time::Duration;
16use tokio::time::error::Elapsed;
17use tokio::time::timeout;
18use tonic::Status;
19use tower::{Layer, Service};
20use tracing::level_filters::LevelFilter;
21use tracing::{debug_span, Instrument, Subscriber};
22use tracing_subscriber::layer::SubscriberExt;
23use tracing_subscriber::{fmt, EnvFilter, Layer as TracingLayer, Registry};
24
25// Re-export types from clementine-utils
26pub use clementine_utils::{
27    FeePayingType, Last20Bytes, NamedEntity, RbfSigningInfo, ScriptBufExt, TryLast20Bytes,
28    TxMetadata,
29};
30
31/// Initializes a [`tracing`] subscriber depending on the environment.
32/// [`EnvFilter`] is used with an optional default level. Sets up the
33/// [`color_eyre`] handler.
34///
35/// # Log Formats
36///
37/// - `json` **JSON** is used when `LOG_FORMAT=json`
38/// - `human` **Human-readable** direct logs are used when `LOG_FORMAT` is not
39///   set to `json`.
40///
41/// ## CI
42///
43/// In CI, logging is always in the human-readable format with output to the
44/// console. The `INFO_LOG_FILE` env var can be used to set an optional log file
45/// output. If not set, only console logging is used.
46///
47/// # Backtraces
48///
49/// Backtraces are enabled by default for tests. Error backtraces otherwise
50/// depend on the `RUST_LIB_BACKTRACE` env var. Please read [`color_eyre`]
51/// documentation for more details.
52///
53/// # Parameters
54///
55/// - `default_level`: Default level ranges from 0 to 5. This is overwritten through the
56///   `RUST_LOG` env var.
57///
58/// # Returns
59///
60/// Returns `Err` in CI if the file logging cannot be initialized.  Already
61/// initialized errors are ignored, so this function can be called multiple
62/// times safely.
63pub fn initialize_logger(default_level: Option<LevelFilter>) -> Result<(), BridgeError> {
64    let is_ci = std::env::var("CI")
65        .map(|v| v == "true" || v == "1")
66        .unwrap_or(false);
67
68    // UNCOMMENT TO DEBUG TOKIO TASKS
69    // console_subscriber::init();
70
71    if cfg!(test) {
72        // Enable full backtraces for tests
73        std::env::set_var("RUST_LIB_BACKTRACE", "full");
74        std::env::set_var("RUST_BACKTRACE", "full");
75    }
76
77    // Initialize color-eyre for better error handling and backtraces
78    let _ = color_eyre::config::HookBuilder::default()
79        .add_frame_filter(Box::new(|frames| {
80            // Frames with names starting with any of the str's below will be filtered out
81            let filters = &[
82                "std::",
83                "test::",
84                "tokio::",
85                "core::",
86                "<core::",
87                "<alloc::",
88                "start_thread",
89                "<tonic::",
90                "<futures::",
91                "<tower::",
92                "<hyper",
93                "hyper",
94                "__rust_try",
95                "<axum::",
96                "<F as ",
97                "clone",
98            ];
99
100            frames.retain(|frame| {
101                !filters.iter().any(|f| {
102                    let name = if let Some(name) = frame.name.as_ref() {
103                        name.as_str()
104                    } else {
105                        return true;
106                    };
107
108                    name.starts_with(f)
109                })
110            });
111        }))
112        .install();
113
114    if is_ci {
115        let info_log_file = std::env::var("INFO_LOG_FILE").ok();
116        if let Some(file_path) = info_log_file {
117            try_set_global_subscriber(env_subscriber_with_file(&file_path)?);
118            tracing::trace!("Using file logging in CI, outputting to {}", file_path);
119        } else {
120            try_set_global_subscriber(env_subscriber_to_human(default_level));
121            tracing::trace!("Using console logging in CI");
122            tracing::warn!(
123                "CI is set but INFO_LOG_FILE is missing, only console logs will be used."
124            );
125        }
126    } else if is_json_logs() {
127        try_set_global_subscriber(env_subscriber_to_json(default_level));
128        tracing::trace!("Using JSON logging");
129    } else {
130        try_set_global_subscriber(env_subscriber_to_human(default_level));
131        tracing::trace!("Using human-readable logging");
132    }
133
134    tracing::info!("Tracing initialized successfully.");
135    Ok(())
136}
137
138pub fn initialize_telemetry(config: &TelemetryConfig) -> Result<(), BridgeError> {
139    let telemetry_addr: SocketAddr = format!("{}:{}", config.host, config.port)
140        .parse()
141        .unwrap_or_else(|_| {
142            tracing::warn!(
143                "Invalid telemetry address: {}:{}, using default address: 127.0.0.1:8081",
144                config.host,
145                config.port
146            );
147            SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), 8081))
148        });
149
150    tracing::debug!("Initializing telemetry at {}", telemetry_addr);
151
152    let builder = PrometheusBuilder::new().with_http_listener(telemetry_addr);
153
154    builder
155        .install()
156        .map_err(|e| eyre::eyre!("Failed to initialize telemetry: {}", e))?;
157
158    Ok(())
159}
160
161fn try_set_global_subscriber<S>(subscriber: S)
162where
163    S: Subscriber + Send + Sync + 'static,
164{
165    match tracing::subscriber::set_global_default(subscriber) {
166        Ok(_) => {}
167        // Statically, the only error possible is "already initialized"
168        Err(_) => {
169            #[cfg(test)]
170            tracing::trace!("Tracing is already initialized, skipping without errors...");
171            #[cfg(not(test))]
172            tracing::info!(
173                "Unexpected double initialization of tracing, skipping without errors..."
174            );
175        }
176    }
177}
178
179fn env_subscriber_with_file(path: &str) -> Result<Box<dyn Subscriber + Send + Sync>, BridgeError> {
180    if let Some(parent_dir) = std::path::Path::new(path).parent() {
181        std::fs::create_dir_all(parent_dir).map_err(|e| {
182            BridgeError::ConfigError(format!(
183                "Failed to create log directory '{}': {}",
184                parent_dir.display(),
185                e
186            ))
187        })?;
188    }
189
190    let file = File::create(path).map_err(|e| BridgeError::ConfigError(e.to_string()))?;
191
192    let file_filter = EnvFilter::from_default_env()
193        .add_directive("info".parse().expect("It should parse info level"))
194        .add_directive("ci=debug".parse().expect("It should parse ci debug level"));
195
196    let console_filter = EnvFilter::builder()
197        .with_default_directive(LevelFilter::WARN.into())
198        .from_env_lossy();
199
200    let file_layer = fmt::layer()
201        .with_writer(file)
202        .with_ansi(false)
203        .with_file(true)
204        .with_line_number(true)
205        .with_target(true)
206        .with_thread_ids(true)
207        .with_thread_names(true)
208        .with_filter(file_filter)
209        .boxed();
210
211    let console_layer = fmt::layer()
212        .with_test_writer()
213        .with_file(true)
214        .with_line_number(true)
215        .with_target(true)
216        .with_filter(console_filter)
217        .boxed();
218
219    Ok(Box::new(
220        Registry::default().with(file_layer).with(console_layer),
221    ))
222}
223
224fn env_subscriber_to_json(level: Option<LevelFilter>) -> Box<dyn Subscriber + Send + Sync> {
225    let filter = match level {
226        Some(lvl) => EnvFilter::builder()
227            .with_default_directive(lvl.into())
228            .from_env_lossy(),
229        None => EnvFilter::from_default_env(),
230    };
231
232    let json_layer = fmt::layer::<Registry>()
233        .with_test_writer()
234        // .with_timer(time::UtcTime::rfc_3339())
235        .with_file(true)
236        .with_line_number(true)
237        .with_thread_ids(true)
238        .with_thread_names(true)
239        .with_target(true)
240        .json();
241    // .with_current_span(true)z
242    // .with_span_list(true)
243    // To see how long each span takes, uncomment this.
244    // .with_span_events(FmtSpan::CLOSE)
245
246    Box::new(tracing_subscriber::registry().with(json_layer).with(filter))
247}
248
249fn env_subscriber_to_human(level: Option<LevelFilter>) -> Box<dyn Subscriber + Send + Sync> {
250    let filter = match level {
251        Some(lvl) => EnvFilter::builder()
252            .with_default_directive(lvl.into())
253            .from_env_lossy(),
254        None => EnvFilter::from_default_env(),
255    };
256
257    let standard_layer = fmt::layer()
258        .with_test_writer()
259        // .with_timer(time::UtcTime::rfc_3339())
260        .with_file(true)
261        .with_line_number(true)
262        // To see how long each span takes, uncomment this.
263        // .with_span_events(FmtSpan::CLOSE)
264        .with_target(true);
265
266    Box::new(
267        tracing_subscriber::registry()
268            .with(standard_layer)
269            .with(filter),
270    )
271}
272
273fn is_json_logs() -> bool {
274    std::env::var("LOG_FORMAT")
275        .map(|v| v.eq_ignore_ascii_case("json"))
276        .unwrap_or(false)
277}
278
279pub fn get_vergen_response() -> VergenResponse {
280    let mut vergen_response = String::new();
281
282    // build info
283    if let Some(date) = option_env!("VERGEN_BUILD_DATE") {
284        vergen_response.push_str(&format!("Build Date: {date}\n"));
285    }
286    if let Some(timestamp) = option_env!("VERGEN_BUILD_TIMESTAMP") {
287        vergen_response.push_str(&format!("Build Timestamp: {timestamp}\n"));
288    }
289
290    // git info
291    if let Some(branch) = option_env!("VERGEN_GIT_BRANCH") {
292        vergen_response.push_str(&format!("git branch: {branch}\n"));
293    }
294    if let Some(commit) = option_env!("VERGEN_GIT_SHA") {
295        vergen_response.push_str(&format!("git commit: {commit}\n"));
296    }
297    if let Some(commit_date) = option_env!("VERGEN_GIT_COMMIT_DATE") {
298        vergen_response.push_str(&format!("git commit date: {commit_date}\n"));
299    }
300    if let Some(commit_timestamp) = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP") {
301        vergen_response.push_str(&format!("git commit timestamp: {commit_timestamp}\n"));
302    }
303    if let Some(commit_author_name) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_NAME") {
304        vergen_response.push_str(&format!("git commit author name: {commit_author_name}\n"));
305    }
306    if let Some(commit_author_email) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_EMAIL") {
307        vergen_response.push_str(&format!("git commit author email: {commit_author_email}\n"));
308    }
309    if let Some(commit_count) = option_env!("VERGEN_GIT_COMMIT_COUNT") {
310        vergen_response.push_str(&format!("git commit count: {commit_count}\n"));
311    }
312    if let Some(commit_message) = option_env!("VERGEN_GIT_COMMIT_MESSAGE") {
313        vergen_response.push_str(&format!("git commit message: {commit_message}\n"));
314    }
315    if let Some(describe) = option_env!("VERGEN_GIT_DESCRIBE") {
316        vergen_response.push_str(&format!("git describe: {describe}\n"));
317    }
318    if let Some(dirty) = option_env!("VERGEN_GIT_DIRTY") {
319        vergen_response.push_str(&format!("git dirty: {dirty}\n"));
320    }
321
322    // cargo info
323    if let Some(debug) = option_env!("VERGEN_CARGO_DEBUG") {
324        vergen_response.push_str(&format!("cargo debug: {debug}\n"));
325    }
326    if let Some(opt_level) = option_env!("VERGEN_CARGO_OPT_LEVEL") {
327        vergen_response.push_str(&format!("cargo opt level: {opt_level}\n"));
328    }
329    if let Some(target_triple) = option_env!("VERGEN_CARGO_TARGET_TRIPLE") {
330        vergen_response.push_str(&format!("cargo target triple: {target_triple}\n"));
331    }
332    if let Some(features) = option_env!("VERGEN_CARGO_FEATURES") {
333        vergen_response.push_str(&format!("cargo features: {features}\n"));
334    }
335    if let Some(dependencies) = option_env!("VERGEN_CARGO_DEPENDENCIES") {
336        vergen_response.push_str(&format!("cargo dependencies: {dependencies}\n"));
337    }
338
339    // rustc info
340    if let Some(channel) = option_env!("VERGEN_RUSTC_CHANNEL") {
341        vergen_response.push_str(&format!("rustc channel: {channel}\n"));
342    }
343    if let Some(version) = option_env!("VERGEN_RUSTC_SEMVER") {
344        vergen_response.push_str(&format!("rustc version: {version}\n"));
345    }
346    if let Some(commit_hash) = option_env!("VERGEN_RUSTC_COMMIT_HASH") {
347        vergen_response.push_str(&format!("rustc commit hash: {commit_hash}\n"));
348    }
349    if let Some(commit_date) = option_env!("VERGEN_RUSTC_COMMIT_DATE") {
350        vergen_response.push_str(&format!("rustc commit date: {commit_date}\n"));
351    }
352    if let Some(host_triple) = option_env!("VERGEN_RUSTC_HOST_TRIPLE") {
353        vergen_response.push_str(&format!("rustc host triple: {host_triple}\n"));
354    }
355    if let Some(llvm_version) = option_env!("VERGEN_RUSTC_LLVM_VERSION") {
356        vergen_response.push_str(&format!("rustc LLVM version: {llvm_version}\n"));
357    }
358
359    // sysinfo
360    if let Some(cpu_brand) = option_env!("VERGEN_SYSINFO_CPU_BRAND") {
361        vergen_response.push_str(&format!("cpu brand: {cpu_brand}\n"));
362    }
363    if let Some(cpu_name) = option_env!("VERGEN_SYSINFO_CPU_NAME") {
364        vergen_response.push_str(&format!("cpu name: {cpu_name}\n"));
365    }
366    if let Some(cpu_vendor) = option_env!("VERGEN_SYSINFO_CPU_VENDOR") {
367        vergen_response.push_str(&format!("cpu vendor: {cpu_vendor}\n"));
368    }
369    if let Some(cpu_core_count) = option_env!("VERGEN_SYSINFO_CPU_CORE_COUNT") {
370        vergen_response.push_str(&format!("cpu core count: {cpu_core_count}\n"));
371    }
372    if let Some(cpu_frequency) = option_env!("VERGEN_SYSINFO_CPU_FREQUENCY") {
373        vergen_response.push_str(&format!("cpu frequency: {cpu_frequency} MHz\n"));
374    }
375    if let Some(memory) = option_env!("VERGEN_SYSINFO_TOTAL_MEMORY") {
376        vergen_response.push_str(&format!("total memory: {memory}\n"));
377    }
378    if let Some(name) = option_env!("VERGEN_SYSINFO_NAME") {
379        vergen_response.push_str(&format!("system name: {name}\n"));
380    }
381    if let Some(os_version) = option_env!("VERGEN_SYSINFO_OS_VERSION") {
382        vergen_response.push_str(&format!("OS version: {os_version}\n"));
383    }
384    if let Some(user) = option_env!("VERGEN_SYSINFO_USER") {
385        vergen_response.push_str(&format!("build user: {user}\n"));
386    }
387
388    VergenResponse {
389        response: vergen_response,
390    }
391}
392
393/// Monitors a [`tokio::task::JoinHandle`] in the background and logs it's end
394/// result.
395pub fn monitor_standalone_task<
396    T: Send + 'static,
397    E: Debug + Send + 'static + From<BridgeError>,
398    C: Send + 'static,
399>(
400    task_handle: tokio::task::JoinHandle<Result<T, E>>,
401    task_name: &str,
402    monitor_err_sender: tokio::sync::mpsc::Sender<Result<C, E>>,
403) {
404    let task_name = task_name.to_string();
405
406    // Move task_handle into the spawned task to make it Send
407    tokio::spawn(async move {
408        match task_handle.await {
409            Ok(Ok(_)) => {
410                tracing::debug!("Task {} completed successfully", task_name);
411            }
412            Ok(Err(e)) => {
413                tracing::error!("Task {} threw an error: {:?}", task_name, e);
414                let _ = monitor_err_sender.send(Err(e)).await.inspect_err(|e| {
415                    tracing::error!("Failed to send error to monitoring channel: {:?}", e)
416                });
417            }
418            Err(e) => {
419                if e.is_cancelled() {
420                    // Task was cancelled, which is expected during cleanup
421                    tracing::debug!("Task {} has been cancelled", task_name);
422                    let _ = monitor_err_sender
423                        .send(Err(Into::<BridgeError>::into(eyre::eyre!(
424                            "Task was cancelled due to: {:?}",
425                            e
426                        ))
427                        .into()))
428                        .await
429                        .inspect_err(|e| {
430                            tracing::error!("Failed to send error to monitoring channel: {:?}", e)
431                        });
432                    return;
433                }
434                tracing::error!("Task {} has panicked: {:?}", task_name, e);
435                let _ = monitor_err_sender
436                    .send(Err(Into::<BridgeError>::into(eyre::eyre!(
437                        "Task has panicked due to: {:?}",
438                        e
439                    ))
440                    .into()))
441                    .await
442                    .inspect_err(|e| {
443                        tracing::error!("Failed to send error to monitoring channel: {:?}", e)
444                    });
445            }
446        }
447    });
448}
449
450/// Delays the exit of the program for 15 seconds, to allow for logs to be flushed.
451/// Then panics with the given arguments.
452///
453/// # Parameters
454///
455/// - `($($arg:tt)*)`: Arguments to pass to `panic!`, in the same manner as format! and println!
456macro_rules! delayed_panic {
457    ($($arg:tt)*) => {
458        {
459            eprintln!($($arg)*);
460            eprintln!("Delaying exit for 15 seconds, to allow for logs to be flushed");
461            std::thread::sleep(std::time::Duration::from_secs(15));
462            panic!($($arg)*);
463        }
464    };
465}
466
467pub(crate) use delayed_panic;
468
469#[derive(Debug, Clone, Default)]
470pub struct AddMethodMiddlewareLayer;
471
472impl<S> Layer<S> for AddMethodMiddlewareLayer {
473    type Service = AddMethodMiddleware<S>;
474
475    fn layer(&self, service: S) -> Self::Service {
476        AddMethodMiddleware { inner: service }
477    }
478}
479
480#[derive(Debug, Clone)]
481pub struct AddMethodMiddleware<S> {
482    inner: S,
483}
484
485type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
486
487impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for AddMethodMiddleware<S>
488where
489    S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>> + Clone + Send + 'static,
490    S::Future: Send + 'static,
491    ReqBody: Send + 'static,
492{
493    type Response = S::Response;
494    type Error = S::Error;
495    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
496
497    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
498        self.inner.poll_ready(cx)
499    }
500
501    fn call(&mut self, mut req: http::Request<ReqBody>) -> Self::Future {
502        // See: https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
503        let clone = self.inner.clone();
504        let mut inner = std::mem::replace(&mut self.inner, clone);
505
506        Box::pin(async move {
507            let path = req.uri().path();
508
509            let grpc_method =
510                if let &[_, _, method] = &path.split("/").collect::<Vec<&str>>().as_slice() {
511                    Some(method.to_string())
512                } else {
513                    None
514                };
515
516            if let Some(grpc_method) = grpc_method {
517                if let Ok(grpc_method) = HeaderValue::from_str(&grpc_method) {
518                    req.headers_mut().insert("grpc-method", grpc_method);
519                }
520            }
521
522            // Do extra async work here...
523            let response = inner.call(req).await?;
524
525            Ok(response)
526        })
527    }
528}
529
530// NOTE: NamedEntity, TxMetadata, FeePayingType, RbfSigningInfo,
531// Last20Bytes, TryLast20Bytes, ScriptBufExt are now re-exported
532// from clementine-utils at the top of this file.
533
534/// Wraps a future with a timeout, returning a `Status::deadline_exceeded` gRPC error
535/// if the future does not complete within the specified duration.
536///
537/// This is useful for enforcing timeouts on individual asynchronous operations,
538/// especially those involving network requests, to prevent them from hanging indefinitely.
539///
540/// # Arguments
541///
542/// * `duration`: The maximum `Duration` to wait for the future to complete.
543/// * `description`: A string slice describing the operation, used in the timeout error message.
544/// * `future`: The `Future` to execute. The future should return a `Result<T, BridgeError>`.
545///
546/// # Returns
547///
548/// Returns `Ok(T)` if the future completes successfully within the time limit.
549/// Returns `Err(BridgeError)` if the future returns an error or if it times out.
550/// A timeout results in a `BridgeError` that wraps a `tonic::Status::deadline_exceeded`.
551pub async fn timed_request<F, T>(
552    duration: Duration,
553    description: &str,
554    future: F,
555) -> Result<T, BridgeError>
556where
557    F: Future<Output = Result<T, BridgeError>>,
558{
559    timed_request_base(duration, description, future)
560        .await
561        .map_err(|_| {
562            Box::new(Status::deadline_exceeded(format!(
563                "{description} timed out"
564            )))
565        })?
566}
567
568/// Wraps a future with a timeout and adds a debug span with the description.
569///
570/// # Arguments
571///
572/// * `duration`: The maximum `Duration` to wait for the future to complete.
573/// * `description`: A string slice describing the operation, used in the timeout error message.
574/// * `future`: The `Future` to execute. The future should return a `Result<T, BridgeError>`.
575///
576/// # Returns
577///
578/// Returns `Ok(Ok(T))` if the future completes successfully within the time limit, returns `Ok(Err(e))`
579/// if the future returns an error, returns `Err(Elapsed)` if the request times out.
580pub async fn timed_request_base<F, T>(
581    duration: Duration,
582    description: &str,
583    future: F,
584) -> Result<Result<T, BridgeError>, Elapsed>
585where
586    F: Future<Output = Result<T, BridgeError>>,
587{
588    timeout(duration, future)
589        .instrument(debug_span!("timed_request", description = description))
590        .await
591}
592
593/// Concurrently executes a collection of futures, applying a timeout to each one individually.
594/// If any future fails or times out, the entire operation is aborted and an error is returned.
595///
596/// This utility is an extension of `futures::future::try_join_all` with added per-future
597/// timeout logic and improved error reporting using optional IDs.
598///
599/// # Type Parameters
600///
601/// * `I`: An iterator that yields futures.
602/// * `T`: The success type of the futures.
603/// * `D`: A type that can be displayed, used for identifying futures in error messages.
604///
605/// # Arguments
606///
607/// * `duration`: The timeout `Duration` applied to each individual future in the iterator.
608/// * `description`: A string slice describing the collective operation, used in timeout error messages.
609/// * `ids`: An optional `Vec<D>` of identifiers corresponding to each future. If provided,
610///   these IDs are used in error messages to specify which future failed or timed out.
611/// * `iter`: An iterator producing the futures to be executed.
612///
613/// # Returns
614///
615/// Returns `Ok(Vec<T>)` containing the results of all futures if they all complete successfully.
616/// Returns `Err(BridgeError)` if any future returns an error or times out. The error will be a combined error of all errors.
617/// The error will be contextualized with the operation description and the specific future's ID if available.
618pub async fn timed_try_join_all<I, T, D>(
619    duration: Duration,
620    description: &str,
621    ids: Option<Vec<D>>,
622    iter: I,
623) -> Result<Vec<T>, BridgeError>
624where
625    D: Display,
626    I: IntoIterator,
627    I::Item: Future<Output = Result<T, BridgeError>>,
628{
629    let ids = Arc::new(ids);
630    let results = join_all(iter.into_iter().enumerate().map(|item| {
631        let ids = ids.clone();
632        async move {
633            let id = Option::as_ref(&ids).and_then(|ids| ids.get(item.0));
634
635            timeout(duration, item.1)
636                .await
637                .map_err(|_| {
638                    Box::new(Status::deadline_exceeded(format!(
639                        "{} (id: {}) timed out",
640                        description,
641                        id.map(|id| id.to_string())
642                            .unwrap_or_else(|| "n/a".to_string())
643                    )))
644                })?
645                // Add the id to the error chain for easier debugging for other errors.
646                .wrap_err_with(|| {
647                    format!(
648                        "Failed to join {}",
649                        id.map(ToString::to_string).unwrap_or_else(|| "n/a".into())
650                    )
651                })
652        }
653    }))
654    .instrument(debug_span!("timed_try_join_all", description = description))
655    .await;
656
657    combine_errors(results)
658}
659
660/// Executes a collection of fallible futures concurrently and aggregates failures into a single
661/// [`BridgeError`].
662///
663/// This runs all futures like [`futures::future::try_join_all`], but instead of stopping on the
664/// first error it waits for every future to finish by leveraging [`join_all`]. All errors are then
665/// combined via [`combine_errors`], ensuring the caller gets full visibility into every failure.
666pub async fn try_join_all_combine_errors<F, T, E>(
667    futures: impl IntoIterator<Item = F>,
668) -> Result<Vec<T>, BridgeError>
669where
670    F: Future<Output = Result<T, E>>,
671    E: std::fmt::Display,
672{
673    let results = join_all(futures).await;
674    combine_errors(results)
675}
676
677/// Executes a collection of fallible futures concurrently and partitions results into successes and errors.
678///
679/// Unlike [`try_join_all_combine_errors`], this function does not fail on errors. Instead, it runs all
680/// futures concurrently using [`futures::future::join_all`] and partitions the results:
681/// - Successful results are collected into a `Vec<T>`
682/// - Errors are collected and combined into an optional error string
683///
684/// This is useful when you want to collect partial successes even if some futures fail, allowing
685/// the caller to decide how to handle the mixed results.
686///
687/// # Returns
688///
689/// Returns a tuple `(Vec<T>, Option<String>)` where:
690/// - The first element contains all successful results
691/// - The second element is `Some(combined_error_string)` if any errors occurred, or `None` if all succeeded
692pub async fn join_all_partition_results<F, T, E>(
693    futures: impl IntoIterator<Item = F>,
694) -> (Vec<T>, Option<String>)
695where
696    F: Future<Output = Result<T, E>>,
697    E: std::fmt::Display,
698{
699    let results = join_all(futures).await;
700    let mut errors = Vec::new();
701    let mut successful_results = Vec::new();
702    for result in results {
703        match result {
704            Ok(value) => successful_results.push(value),
705            Err(e) => errors.push(format!("{e:#}")),
706        }
707    }
708
709    (
710        successful_results,
711        match errors.is_empty() {
712            true => None,
713            false => Some(format!(
714                "Number of failed futures: {}: {}",
715                errors.len(),
716                errors.join("; ")
717            )),
718        },
719    )
720}
721
722/// Collects errors from an iterator of results and returns a combined error if any failed.
723///
724/// # Parameters
725/// * `results`: Iterator of results (errors should contain identifying information in their Debug representation)
726/// * `prefix`: Prefix message for the combined error (e.g., "Operator key collection failures")
727///
728/// # Returns
729/// * `Ok(Vec<T>)` containing all successful results if all results are successful
730/// * `Err(BridgeError)` with a combined error message listing all failures
731pub fn combine_errors<I, EIn, T>(results: I) -> Result<Vec<T>, BridgeError>
732where
733    I: IntoIterator<Item = Result<T, EIn>>,
734    EIn: std::fmt::Display,
735{
736    let mut errors = Vec::new();
737    let mut successful_results = Vec::new();
738    for result in results {
739        match result {
740            Ok(value) => successful_results.push(value),
741            Err(e) => errors.push(format!("{e:#}")),
742        }
743    }
744    if !errors.is_empty() {
745        return Err(BridgeError::from(eyre::eyre!(
746            "Number of failed futures: {}: {}",
747            errors.len(),
748            errors.join("; ")
749        )));
750    }
751    Ok(successful_results)
752}
753
754/// Collects all errors (both outer and inner) from named task results and returns a combined error if any task failed.
755///
756/// This function is useful when you have multiple async tasks (e.g., from `tokio::spawn`) and want to
757/// see all errors if multiple tasks fail, rather than just the first error.
758///
759/// # Parameters
760/// * `task_results`: Iterator of tuples containing (task_name, Result<Result<T, E1>, E2>)
761///   - `task_name`: A string-like identifier for the task (used in error messages)
762///   - The nested Result represents: `Result<T, E1>` is the task's result, `E2` is typically a `JoinError`
763///
764/// # Returns
765/// * `Ok(())` if all tasks completed successfully
766/// * `Err(BridgeError)` with a combined error message listing all failures
767pub fn flatten_join_named_results<T, E1, E2, S, R>(task_results: R) -> Result<(), BridgeError>
768where
769    R: IntoIterator<Item = (S, Result<Result<T, E1>, E2>)>,
770    S: AsRef<str>,
771    E1: std::fmt::Display,
772    E2: std::fmt::Display,
773{
774    let mut task_errors = Vec::new();
775
776    for (task_name, task_output) in task_results.into_iter() {
777        match task_output {
778            Ok(inner_result) => {
779                if let Err(e) = inner_result {
780                    let err_msg = format!("{} failed with error: {:#}", task_name.as_ref(), e);
781                    task_errors.push(err_msg);
782                }
783            }
784            Err(e) => {
785                let err_msg = format!(
786                    "{} task thread failed with error: {:#}",
787                    task_name.as_ref(),
788                    e
789                );
790                task_errors.push(err_msg);
791            }
792        }
793    }
794
795    if !task_errors.is_empty() {
796        tracing::error!("Tasks failed with errors: {:#?}", task_errors);
797        return Err(eyre::eyre!("Tasks failed with errors: {:#?}", task_errors).into());
798    }
799
800    Ok(())
801}
802
803#[cfg(test)]
804mod tests {
805    use super::*;
806    use std::fs;
807    use std::io::Read;
808    use tempfile::NamedTempFile;
809    use tracing::level_filters::LevelFilter;
810
811    #[test]
812    #[ignore = "This test changes environment variables so it should not be run in CI since it might affect other tests."]
813    fn test_ci_logging_setup() {
814        let temp_file = NamedTempFile::new().expect("Failed to create temp file");
815        let temp_path = temp_file.path().to_string_lossy().to_string();
816
817        std::env::set_var("CI", "true");
818        std::env::set_var("INFO_LOG_FILE", &temp_path);
819
820        let result = initialize_logger(Some(LevelFilter::DEBUG));
821        assert!(result.is_ok(), "Logger initialization should succeed");
822
823        tracing::error!("Test error message");
824        tracing::warn!("Test warn message");
825        tracing::info!("Test info message");
826        tracing::debug!(target: "ci", "Test CI debug message");
827        tracing::debug!("Test debug message");
828
829        std::thread::sleep(std::time::Duration::from_millis(100));
830
831        let mut file_contents = String::new();
832        let mut file = fs::File::open(&temp_path).expect("Failed to open log file");
833        file.read_to_string(&mut file_contents)
834            .expect("Failed to read log file");
835
836        assert!(
837            file_contents.contains("Test error message"),
838            "Error message should be in file"
839        );
840        assert!(
841            file_contents.contains("Test warn message"),
842            "Warn message should be in file"
843        );
844        assert!(
845            file_contents.contains("Test info message"),
846            "Info message should be in file"
847        );
848
849        assert!(
850            file_contents.contains("Test CI debug message"),
851            "Debug message for CI should be in file"
852        );
853
854        assert!(
855            !file_contents.contains("Test debug message"),
856            "Debug message should not be in file"
857        );
858
859        std::env::remove_var("CI");
860        std::env::remove_var("INFO_LOG_FILE");
861    }
862}