1use crate::builder::transaction::TransactionType;
2use crate::config::TelemetryConfig;
3use crate::errors::BridgeError;
4use crate::operator::RoundIndex;
5use crate::rpc::clementine::VergenResponse;
6use bitcoin::{OutPoint, ScriptBuf, TapNodeHash, XOnlyPublicKey};
7use eyre::Context as _;
8use futures::future::join_all;
9use http::HeaderValue;
10use metrics_exporter_prometheus::PrometheusBuilder;
11use serde::{Deserialize, Serialize};
12use std::fmt::{Debug, Display};
13use std::fs::File;
14use std::future::Future;
15use std::net::{Ipv4Addr, SocketAddr};
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19use std::time::Duration;
20use tokio::time::error::Elapsed;
21use tokio::time::timeout;
22use tonic::Status;
23use tower::{Layer, Service};
24use tracing::level_filters::LevelFilter;
25use tracing::{debug_span, Instrument, Subscriber};
26use tracing_subscriber::layer::SubscriberExt;
27use tracing_subscriber::{fmt, EnvFilter, Layer as TracingLayer, Registry};
28
29pub fn initialize_logger(default_level: Option<LevelFilter>) -> Result<(), BridgeError> {
62 let is_ci = std::env::var("CI")
63 .map(|v| v == "true" || v == "1")
64 .unwrap_or(false);
65
66 if cfg!(test) {
70 std::env::set_var("RUST_LIB_BACKTRACE", "full");
72 std::env::set_var("RUST_BACKTRACE", "full");
73 }
74
75 let _ = color_eyre::config::HookBuilder::default()
77 .add_frame_filter(Box::new(|frames| {
78 let filters = &[
80 "std::",
81 "test::",
82 "tokio::",
83 "core::",
84 "<core::",
85 "<alloc::",
86 "start_thread",
87 "<tonic::",
88 "<futures::",
89 "<tower::",
90 "<hyper",
91 "hyper",
92 "__rust_try",
93 "<axum::",
94 "<F as ",
95 "clone",
96 ];
97
98 frames.retain(|frame| {
99 !filters.iter().any(|f| {
100 let name = if let Some(name) = frame.name.as_ref() {
101 name.as_str()
102 } else {
103 return true;
104 };
105
106 name.starts_with(f)
107 })
108 });
109 }))
110 .install();
111
112 if is_ci {
113 let info_log_file = std::env::var("INFO_LOG_FILE").ok();
114 if let Some(file_path) = info_log_file {
115 try_set_global_subscriber(env_subscriber_with_file(&file_path)?);
116 tracing::trace!("Using file logging in CI, outputting to {}", file_path);
117 } else {
118 try_set_global_subscriber(env_subscriber_to_human(default_level));
119 tracing::trace!("Using console logging in CI");
120 tracing::warn!(
121 "CI is set but INFO_LOG_FILE is missing, only console logs will be used."
122 );
123 }
124 } else if is_json_logs() {
125 try_set_global_subscriber(env_subscriber_to_json(default_level));
126 tracing::trace!("Using JSON logging");
127 } else {
128 try_set_global_subscriber(env_subscriber_to_human(default_level));
129 tracing::trace!("Using human-readable logging");
130 }
131
132 tracing::info!("Tracing initialized successfully.");
133 Ok(())
134}
135
136pub fn initialize_telemetry(config: &TelemetryConfig) -> Result<(), BridgeError> {
137 let telemetry_addr: SocketAddr = format!("{}:{}", config.host, config.port)
138 .parse()
139 .unwrap_or_else(|_| {
140 tracing::warn!(
141 "Invalid telemetry address: {}:{}, using default address: 127.0.0.1:8081",
142 config.host,
143 config.port
144 );
145 SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), 8081))
146 });
147
148 tracing::debug!("Initializing telemetry at {}", telemetry_addr);
149
150 let builder = PrometheusBuilder::new().with_http_listener(telemetry_addr);
151
152 builder
153 .install()
154 .map_err(|e| eyre::eyre!("Failed to initialize telemetry: {}", e))?;
155
156 Ok(())
157}
158
159fn try_set_global_subscriber<S>(subscriber: S)
160where
161 S: Subscriber + Send + Sync + 'static,
162{
163 match tracing::subscriber::set_global_default(subscriber) {
164 Ok(_) => {}
165 Err(_) => {
167 #[cfg(test)]
168 tracing::trace!("Tracing is already initialized, skipping without errors...");
169 #[cfg(not(test))]
170 tracing::info!(
171 "Unexpected double initialization of tracing, skipping without errors..."
172 );
173 }
174 }
175}
176
177fn env_subscriber_with_file(path: &str) -> Result<Box<dyn Subscriber + Send + Sync>, BridgeError> {
178 if let Some(parent_dir) = std::path::Path::new(path).parent() {
179 std::fs::create_dir_all(parent_dir).map_err(|e| {
180 BridgeError::ConfigError(format!(
181 "Failed to create log directory '{}': {}",
182 parent_dir.display(),
183 e
184 ))
185 })?;
186 }
187
188 let file = File::create(path).map_err(|e| BridgeError::ConfigError(e.to_string()))?;
189
190 let file_filter = EnvFilter::from_default_env()
191 .add_directive("info".parse().expect("It should parse info level"))
192 .add_directive("ci=debug".parse().expect("It should parse ci debug level"));
193
194 let console_filter = EnvFilter::builder()
195 .with_default_directive(LevelFilter::WARN.into())
196 .from_env_lossy();
197
198 let file_layer = fmt::layer()
199 .with_writer(file)
200 .with_ansi(false)
201 .with_file(true)
202 .with_line_number(true)
203 .with_target(true)
204 .with_thread_ids(true)
205 .with_thread_names(true)
206 .with_filter(file_filter)
207 .boxed();
208
209 let console_layer = fmt::layer()
210 .with_test_writer()
211 .with_file(true)
212 .with_line_number(true)
213 .with_target(true)
214 .with_filter(console_filter)
215 .boxed();
216
217 Ok(Box::new(
218 Registry::default().with(file_layer).with(console_layer),
219 ))
220}
221
222fn env_subscriber_to_json(level: Option<LevelFilter>) -> Box<dyn Subscriber + Send + Sync> {
223 let filter = match level {
224 Some(lvl) => EnvFilter::builder()
225 .with_default_directive(lvl.into())
226 .from_env_lossy(),
227 None => EnvFilter::from_default_env(),
228 };
229
230 let json_layer = fmt::layer::<Registry>()
231 .with_test_writer()
232 .with_file(true)
234 .with_line_number(true)
235 .with_thread_ids(true)
236 .with_thread_names(true)
237 .with_target(true)
238 .json();
239 Box::new(tracing_subscriber::registry().with(json_layer).with(filter))
245}
246
247fn env_subscriber_to_human(level: Option<LevelFilter>) -> Box<dyn Subscriber + Send + Sync> {
248 let filter = match level {
249 Some(lvl) => EnvFilter::builder()
250 .with_default_directive(lvl.into())
251 .from_env_lossy(),
252 None => EnvFilter::from_default_env(),
253 };
254
255 let standard_layer = fmt::layer()
256 .with_test_writer()
257 .with_file(true)
259 .with_line_number(true)
260 .with_target(true);
263
264 Box::new(
265 tracing_subscriber::registry()
266 .with(standard_layer)
267 .with(filter),
268 )
269}
270
271fn is_json_logs() -> bool {
272 std::env::var("LOG_FORMAT")
273 .map(|v| v.eq_ignore_ascii_case("json"))
274 .unwrap_or(false)
275}
276
277pub fn get_vergen_response() -> VergenResponse {
278 let mut vergen_response = String::new();
279
280 if let Some(date) = option_env!("VERGEN_BUILD_DATE") {
282 vergen_response.push_str(&format!("Build Date: {date}\n"));
283 }
284 if let Some(timestamp) = option_env!("VERGEN_BUILD_TIMESTAMP") {
285 vergen_response.push_str(&format!("Build Timestamp: {timestamp}\n"));
286 }
287
288 if let Some(branch) = option_env!("VERGEN_GIT_BRANCH") {
290 vergen_response.push_str(&format!("git branch: {branch}\n"));
291 }
292 if let Some(commit) = option_env!("VERGEN_GIT_SHA") {
293 vergen_response.push_str(&format!("git commit: {commit}\n"));
294 }
295 if let Some(commit_date) = option_env!("VERGEN_GIT_COMMIT_DATE") {
296 vergen_response.push_str(&format!("git commit date: {commit_date}\n"));
297 }
298 if let Some(commit_timestamp) = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP") {
299 vergen_response.push_str(&format!("git commit timestamp: {commit_timestamp}\n"));
300 }
301 if let Some(commit_author_name) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_NAME") {
302 vergen_response.push_str(&format!("git commit author name: {commit_author_name}\n"));
303 }
304 if let Some(commit_author_email) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_EMAIL") {
305 vergen_response.push_str(&format!("git commit author email: {commit_author_email}\n"));
306 }
307 if let Some(commit_count) = option_env!("VERGEN_GIT_COMMIT_COUNT") {
308 vergen_response.push_str(&format!("git commit count: {commit_count}\n"));
309 }
310 if let Some(commit_message) = option_env!("VERGEN_GIT_COMMIT_MESSAGE") {
311 vergen_response.push_str(&format!("git commit message: {commit_message}\n"));
312 }
313 if let Some(describe) = option_env!("VERGEN_GIT_DESCRIBE") {
314 vergen_response.push_str(&format!("git describe: {describe}\n"));
315 }
316 if let Some(dirty) = option_env!("VERGEN_GIT_DIRTY") {
317 vergen_response.push_str(&format!("git dirty: {dirty}\n"));
318 }
319
320 if let Some(debug) = option_env!("VERGEN_CARGO_DEBUG") {
322 vergen_response.push_str(&format!("cargo debug: {debug}\n"));
323 }
324 if let Some(opt_level) = option_env!("VERGEN_CARGO_OPT_LEVEL") {
325 vergen_response.push_str(&format!("cargo opt level: {opt_level}\n"));
326 }
327 if let Some(target_triple) = option_env!("VERGEN_CARGO_TARGET_TRIPLE") {
328 vergen_response.push_str(&format!("cargo target triple: {target_triple}\n"));
329 }
330 if let Some(features) = option_env!("VERGEN_CARGO_FEATURES") {
331 vergen_response.push_str(&format!("cargo features: {features}\n"));
332 }
333 if let Some(dependencies) = option_env!("VERGEN_CARGO_DEPENDENCIES") {
334 vergen_response.push_str(&format!("cargo dependencies: {dependencies}\n"));
335 }
336
337 if let Some(channel) = option_env!("VERGEN_RUSTC_CHANNEL") {
339 vergen_response.push_str(&format!("rustc channel: {channel}\n"));
340 }
341 if let Some(version) = option_env!("VERGEN_RUSTC_SEMVER") {
342 vergen_response.push_str(&format!("rustc version: {version}\n"));
343 }
344 if let Some(commit_hash) = option_env!("VERGEN_RUSTC_COMMIT_HASH") {
345 vergen_response.push_str(&format!("rustc commit hash: {commit_hash}\n"));
346 }
347 if let Some(commit_date) = option_env!("VERGEN_RUSTC_COMMIT_DATE") {
348 vergen_response.push_str(&format!("rustc commit date: {commit_date}\n"));
349 }
350 if let Some(host_triple) = option_env!("VERGEN_RUSTC_HOST_TRIPLE") {
351 vergen_response.push_str(&format!("rustc host triple: {host_triple}\n"));
352 }
353 if let Some(llvm_version) = option_env!("VERGEN_RUSTC_LLVM_VERSION") {
354 vergen_response.push_str(&format!("rustc LLVM version: {llvm_version}\n"));
355 }
356
357 if let Some(cpu_brand) = option_env!("VERGEN_SYSINFO_CPU_BRAND") {
359 vergen_response.push_str(&format!("cpu brand: {cpu_brand}\n"));
360 }
361 if let Some(cpu_name) = option_env!("VERGEN_SYSINFO_CPU_NAME") {
362 vergen_response.push_str(&format!("cpu name: {cpu_name}\n"));
363 }
364 if let Some(cpu_vendor) = option_env!("VERGEN_SYSINFO_CPU_VENDOR") {
365 vergen_response.push_str(&format!("cpu vendor: {cpu_vendor}\n"));
366 }
367 if let Some(cpu_core_count) = option_env!("VERGEN_SYSINFO_CPU_CORE_COUNT") {
368 vergen_response.push_str(&format!("cpu core count: {cpu_core_count}\n"));
369 }
370 if let Some(cpu_frequency) = option_env!("VERGEN_SYSINFO_CPU_FREQUENCY") {
371 vergen_response.push_str(&format!("cpu frequency: {cpu_frequency} MHz\n"));
372 }
373 if let Some(memory) = option_env!("VERGEN_SYSINFO_TOTAL_MEMORY") {
374 vergen_response.push_str(&format!("total memory: {memory}\n"));
375 }
376 if let Some(name) = option_env!("VERGEN_SYSINFO_NAME") {
377 vergen_response.push_str(&format!("system name: {name}\n"));
378 }
379 if let Some(os_version) = option_env!("VERGEN_SYSINFO_OS_VERSION") {
380 vergen_response.push_str(&format!("OS version: {os_version}\n"));
381 }
382 if let Some(user) = option_env!("VERGEN_SYSINFO_USER") {
383 vergen_response.push_str(&format!("build user: {user}\n"));
384 }
385
386 VergenResponse {
387 response: vergen_response,
388 }
389}
390
391pub fn monitor_standalone_task<
394 T: Send + 'static,
395 E: Debug + Send + 'static + From<BridgeError>,
396 C: Send + 'static,
397>(
398 task_handle: tokio::task::JoinHandle<Result<T, E>>,
399 task_name: &str,
400 monitor_err_sender: tokio::sync::mpsc::Sender<Result<C, E>>,
401) {
402 let task_name = task_name.to_string();
403
404 tokio::spawn(async move {
406 match task_handle.await {
407 Ok(Ok(_)) => {
408 tracing::debug!("Task {} completed successfully", task_name);
409 }
410 Ok(Err(e)) => {
411 tracing::error!("Task {} threw an error: {:?}", task_name, e);
412 let _ = monitor_err_sender.send(Err(e)).await.inspect_err(|e| {
413 tracing::error!("Failed to send error to monitoring channel: {:?}", e)
414 });
415 }
416 Err(e) => {
417 if e.is_cancelled() {
418 tracing::debug!("Task {} has been cancelled", task_name);
420 let _ = monitor_err_sender
421 .send(Err(Into::<BridgeError>::into(eyre::eyre!(
422 "Task was cancelled due to: {:?}",
423 e
424 ))
425 .into()))
426 .await
427 .inspect_err(|e| {
428 tracing::error!("Failed to send error to monitoring channel: {:?}", e)
429 });
430 return;
431 }
432 tracing::error!("Task {} has panicked: {:?}", task_name, e);
433 let _ = monitor_err_sender
434 .send(Err(Into::<BridgeError>::into(eyre::eyre!(
435 "Task has panicked due to: {:?}",
436 e
437 ))
438 .into()))
439 .await
440 .inspect_err(|e| {
441 tracing::error!("Failed to send error to monitoring channel: {:?}", e)
442 });
443 }
444 }
445 });
446}
447
448macro_rules! delayed_panic {
455 ($($arg:tt)*) => {
456 {
457 eprintln!($($arg)*);
458 eprintln!("Delaying exit for 15 seconds, to allow for logs to be flushed");
459 std::thread::sleep(std::time::Duration::from_secs(15));
460 panic!($($arg)*);
461 }
462 };
463}
464
465pub(crate) use delayed_panic;
466
467#[derive(Debug, Clone, Default)]
468pub struct AddMethodMiddlewareLayer;
469
470impl<S> Layer<S> for AddMethodMiddlewareLayer {
471 type Service = AddMethodMiddleware<S>;
472
473 fn layer(&self, service: S) -> Self::Service {
474 AddMethodMiddleware { inner: service }
475 }
476}
477
478#[derive(Debug, Clone)]
479pub struct AddMethodMiddleware<S> {
480 inner: S,
481}
482
483type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
484
485impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for AddMethodMiddleware<S>
486where
487 S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>> + Clone + Send + 'static,
488 S::Future: Send + 'static,
489 ReqBody: Send + 'static,
490{
491 type Response = S::Response;
492 type Error = S::Error;
493 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
494
495 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
496 self.inner.poll_ready(cx)
497 }
498
499 fn call(&mut self, mut req: http::Request<ReqBody>) -> Self::Future {
500 let clone = self.inner.clone();
502 let mut inner = std::mem::replace(&mut self.inner, clone);
503
504 Box::pin(async move {
505 let path = req.uri().path();
506
507 let grpc_method =
508 if let &[_, _, method] = &path.split("/").collect::<Vec<&str>>().as_slice() {
509 Some(method.to_string())
510 } else {
511 None
512 };
513
514 if let Some(grpc_method) = grpc_method {
515 if let Ok(grpc_method) = HeaderValue::from_str(&grpc_method) {
516 req.headers_mut().insert("grpc-method", grpc_method);
517 }
518 }
519
520 let response = inner.call(req).await?;
522
523 Ok(response)
524 })
525 }
526}
527
528pub trait NamedEntity: Sync + Send + 'static {
532 const ENTITY_NAME: &'static str;
538
539 const TX_SENDER_CONSUMER_ID: &'static str;
541
542 const FINALIZED_BLOCK_CONSUMER_ID_NO_AUTOMATION: &'static str;
544
545 const FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION: &'static str;
547}
548
549#[derive(Copy, Clone, Eq, Hash, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
550pub struct TxMetadata {
551 pub deposit_outpoint: Option<OutPoint>,
552 pub operator_xonly_pk: Option<XOnlyPublicKey>,
553 pub round_idx: Option<RoundIndex>,
554 pub kickoff_idx: Option<u32>,
555 pub tx_type: TransactionType,
556}
557
558impl std::fmt::Debug for TxMetadata {
559 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
560 let mut dbg_struct = f.debug_struct("TxMetadata");
561 if let Some(deposit_outpoint) = self.deposit_outpoint {
562 dbg_struct.field("deposit_outpoint", &deposit_outpoint);
563 }
564 if let Some(operator_xonly_pk) = self.operator_xonly_pk {
565 dbg_struct.field("operator_xonly_pk", &operator_xonly_pk);
566 }
567 if let Some(round_idx) = self.round_idx {
568 dbg_struct.field("round_idx", &round_idx);
569 }
570 if let Some(kickoff_idx) = self.kickoff_idx {
571 dbg_struct.field("kickoff_idx", &kickoff_idx);
572 }
573 dbg_struct.field("tx_type", &self.tx_type);
574 dbg_struct.finish()
575 }
576}
577
578#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord, sqlx::Type)]
580#[sqlx(type_name = "fee_paying_type", rename_all = "lowercase")]
581pub enum FeePayingType {
582 CPFP,
587 RBF,
591 NoFunding,
594}
595
596#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
604pub struct RbfSigningInfo {
605 pub vout: u32,
606 pub tweak_merkle_root: Option<TapNodeHash>,
607 #[cfg(test)]
608 pub annex: Option<Vec<u8>>,
609 #[cfg(test)]
610 pub additional_taproot_output_count: Option<u32>,
611}
612pub trait Last20Bytes {
613 fn last_20_bytes(&self) -> [u8; 20];
614}
615
616pub trait TryLast20Bytes {
617 fn try_last_20_bytes(self) -> Result<[u8; 20], BridgeError>;
618}
619
620impl Last20Bytes for [u8; 32] {
621 fn last_20_bytes(&self) -> [u8; 20] {
622 self.try_last_20_bytes().expect("will not happen")
623 }
624}
625
626pub trait ScriptBufExt {
627 fn try_get_taproot_pk(&self) -> Result<XOnlyPublicKey, BridgeError>;
628}
629
630impl ScriptBufExt for ScriptBuf {
631 fn try_get_taproot_pk(&self) -> Result<XOnlyPublicKey, BridgeError> {
632 if !self.is_p2tr() {
633 return Err(eyre::eyre!("Script is not a valid P2TR script (not 34 bytes)").into());
634 }
635
636 Ok(XOnlyPublicKey::from_slice(&self.as_bytes()[2..34])
637 .wrap_err("Failed to parse XOnlyPublicKey from script")?)
638 }
639}
640
641impl TryLast20Bytes for &[u8] {
642 fn try_last_20_bytes(self) -> Result<[u8; 20], BridgeError> {
643 if self.len() < 20 {
644 return Err(eyre::eyre!("Input is too short to contain 20 bytes").into());
645 }
646 let mut result = [0u8; 20];
647
648 result.copy_from_slice(&self[self.len() - 20..]);
649 Ok(result)
650 }
651}
652
653pub async fn timed_request<F, T>(
671 duration: Duration,
672 description: &str,
673 future: F,
674) -> Result<T, BridgeError>
675where
676 F: Future<Output = Result<T, BridgeError>>,
677{
678 timed_request_base(duration, description, future)
679 .await
680 .map_err(|_| {
681 Box::new(Status::deadline_exceeded(format!(
682 "{description} timed out"
683 )))
684 })?
685}
686
687pub async fn timed_request_base<F, T>(
700 duration: Duration,
701 description: &str,
702 future: F,
703) -> Result<Result<T, BridgeError>, Elapsed>
704where
705 F: Future<Output = Result<T, BridgeError>>,
706{
707 timeout(duration, future)
708 .instrument(debug_span!("timed_request", description = description))
709 .await
710}
711
712pub async fn timed_try_join_all<I, T, D>(
738 duration: Duration,
739 description: &str,
740 ids: Option<Vec<D>>,
741 iter: I,
742) -> Result<Vec<T>, BridgeError>
743where
744 D: Display,
745 I: IntoIterator,
746 I::Item: Future<Output = Result<T, BridgeError>>,
747{
748 let ids = Arc::new(ids);
749 let results = join_all(iter.into_iter().enumerate().map(|item| {
750 let ids = ids.clone();
751 async move {
752 let id = Option::as_ref(&ids).and_then(|ids| ids.get(item.0));
753
754 timeout(duration, item.1)
755 .await
756 .map_err(|_| {
757 Box::new(Status::deadline_exceeded(format!(
758 "{} (id: {}) timed out",
759 description,
760 id.map(|id| id.to_string())
761 .unwrap_or_else(|| "n/a".to_string())
762 )))
763 })?
764 .wrap_err_with(|| {
766 format!(
767 "Failed to join {}",
768 id.map(ToString::to_string).unwrap_or_else(|| "n/a".into())
769 )
770 })
771 }
772 }))
773 .instrument(debug_span!("timed_try_join_all", description = description))
774 .await;
775
776 combine_errors(results)
777}
778
779pub async fn join_all_combine_errors<F, T, E>(
786 futures: impl IntoIterator<Item = F>,
787) -> Result<Vec<T>, BridgeError>
788where
789 F: Future<Output = Result<T, E>>,
790 E: std::fmt::Display,
791{
792 let results = join_all(futures).await;
793 combine_errors(results)
794}
795
796pub fn combine_errors<I, EIn, T>(results: I) -> Result<Vec<T>, BridgeError>
806where
807 I: IntoIterator<Item = Result<T, EIn>>,
808 EIn: std::fmt::Display,
809{
810 let mut errors = Vec::new();
811 let mut successful_results = Vec::new();
812 for result in results {
813 match result {
814 Ok(value) => successful_results.push(value),
815 Err(e) => errors.push(format!("{e:#}")),
816 }
817 }
818 if !errors.is_empty() {
819 return Err(BridgeError::from(eyre::eyre!(
820 "Number of failed futures: {}: {}",
821 errors.len(),
822 errors.join("; ")
823 )));
824 }
825 Ok(successful_results)
826}
827
828pub fn flatten_join_named_results<T, E1, E2, S, R>(task_results: R) -> Result<(), BridgeError>
842where
843 R: IntoIterator<Item = (S, Result<Result<T, E1>, E2>)>,
844 S: AsRef<str>,
845 E1: std::fmt::Display,
846 E2: std::fmt::Display,
847{
848 let mut task_errors = Vec::new();
849
850 for (task_name, task_output) in task_results.into_iter() {
851 match task_output {
852 Ok(inner_result) => {
853 if let Err(e) = inner_result {
854 let err_msg = format!("{} failed with error: {:#}", task_name.as_ref(), e);
855 task_errors.push(err_msg);
856 }
857 }
858 Err(e) => {
859 let err_msg = format!(
860 "{} task thread failed with error: {:#}",
861 task_name.as_ref(),
862 e
863 );
864 task_errors.push(err_msg);
865 }
866 }
867 }
868
869 if !task_errors.is_empty() {
870 tracing::error!("Tasks failed with errors: {:#?}", task_errors);
871 return Err(eyre::eyre!("Tasks failed with errors: {:#?}", task_errors).into());
872 }
873
874 Ok(())
875}
876
877#[cfg(test)]
878mod tests {
879 use super::*;
880 use std::fs;
881 use std::io::Read;
882 use tempfile::NamedTempFile;
883 use tracing::level_filters::LevelFilter;
884
885 #[test]
886 #[ignore = "This test changes environment variables so it should not be run in CI since it might affect other tests."]
887 fn test_ci_logging_setup() {
888 let temp_file = NamedTempFile::new().expect("Failed to create temp file");
889 let temp_path = temp_file.path().to_string_lossy().to_string();
890
891 std::env::set_var("CI", "true");
892 std::env::set_var("INFO_LOG_FILE", &temp_path);
893
894 let result = initialize_logger(Some(LevelFilter::DEBUG));
895 assert!(result.is_ok(), "Logger initialization should succeed");
896
897 tracing::error!("Test error message");
898 tracing::warn!("Test warn message");
899 tracing::info!("Test info message");
900 tracing::debug!(target: "ci", "Test CI debug message");
901 tracing::debug!("Test debug message");
902
903 std::thread::sleep(std::time::Duration::from_millis(100));
904
905 let mut file_contents = String::new();
906 let mut file = fs::File::open(&temp_path).expect("Failed to open log file");
907 file.read_to_string(&mut file_contents)
908 .expect("Failed to read log file");
909
910 assert!(
911 file_contents.contains("Test error message"),
912 "Error message should be in file"
913 );
914 assert!(
915 file_contents.contains("Test warn message"),
916 "Warn message should be in file"
917 );
918 assert!(
919 file_contents.contains("Test info message"),
920 "Info message should be in file"
921 );
922
923 assert!(
924 file_contents.contains("Test CI debug message"),
925 "Debug message for CI should be in file"
926 );
927
928 assert!(
929 !file_contents.contains("Test debug message"),
930 "Debug message should not be in file"
931 );
932
933 std::env::remove_var("CI");
934 std::env::remove_var("INFO_LOG_FILE");
935 }
936}