clementine_core/
utils.rs

1use crate::builder::transaction::TransactionType;
2use crate::config::TelemetryConfig;
3use crate::errors::BridgeError;
4use crate::operator::RoundIndex;
5use crate::rpc::clementine::VergenResponse;
6use bitcoin::{OutPoint, ScriptBuf, TapNodeHash, XOnlyPublicKey};
7use eyre::Context as _;
8use futures::future::join_all;
9use http::HeaderValue;
10use metrics_exporter_prometheus::PrometheusBuilder;
11use serde::{Deserialize, Serialize};
12use std::fmt::{Debug, Display};
13use std::fs::File;
14use std::future::Future;
15use std::net::{Ipv4Addr, SocketAddr};
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19use std::time::Duration;
20use tokio::time::error::Elapsed;
21use tokio::time::timeout;
22use tonic::Status;
23use tower::{Layer, Service};
24use tracing::level_filters::LevelFilter;
25use tracing::{debug_span, Instrument, Subscriber};
26use tracing_subscriber::layer::SubscriberExt;
27use tracing_subscriber::{fmt, EnvFilter, Layer as TracingLayer, Registry};
28
29/// Initializes a [`tracing`] subscriber depending on the environment.
30/// [`EnvFilter`] is used with an optional default level. Sets up the
31/// [`color_eyre`] handler.
32///
33/// # Log Formats
34///
35/// - `json` **JSON** is used when `LOG_FORMAT=json`
36/// - `human` **Human-readable** direct logs are used when `LOG_FORMAT` is not
37///   set to `json`.
38///
39/// ## CI
40///
41/// In CI, logging is always in the human-readable format with output to the
42/// console. The `INFO_LOG_FILE` env var can be used to set an optional log file
43/// output. If not set, only console logging is used.
44///
45/// # Backtraces
46///
47/// Backtraces are enabled by default for tests. Error backtraces otherwise
48/// depend on the `RUST_LIB_BACKTRACE` env var. Please read [`color_eyre`]
49/// documentation for more details.
50///
51/// # Parameters
52///
53/// - `default_level`: Default level ranges from 0 to 5. This is overwritten through the
54///   `RUST_LOG` env var.
55///
56/// # Returns
57///
58/// Returns `Err` in CI if the file logging cannot be initialized.  Already
59/// initialized errors are ignored, so this function can be called multiple
60/// times safely.
61pub fn initialize_logger(default_level: Option<LevelFilter>) -> Result<(), BridgeError> {
62    let is_ci = std::env::var("CI")
63        .map(|v| v == "true" || v == "1")
64        .unwrap_or(false);
65
66    // UNCOMMENT TO DEBUG TOKIO TASKS
67    // console_subscriber::init();
68
69    if cfg!(test) {
70        // Enable full backtraces for tests
71        std::env::set_var("RUST_LIB_BACKTRACE", "full");
72        std::env::set_var("RUST_BACKTRACE", "full");
73    }
74
75    // Initialize color-eyre for better error handling and backtraces
76    let _ = color_eyre::config::HookBuilder::default()
77        .add_frame_filter(Box::new(|frames| {
78            // Frames with names starting with any of the str's below will be filtered out
79            let filters = &[
80                "std::",
81                "test::",
82                "tokio::",
83                "core::",
84                "<core::",
85                "<alloc::",
86                "start_thread",
87                "<tonic::",
88                "<futures::",
89                "<tower::",
90                "<hyper",
91                "hyper",
92                "__rust_try",
93                "<axum::",
94                "<F as ",
95                "clone",
96            ];
97
98            frames.retain(|frame| {
99                !filters.iter().any(|f| {
100                    let name = if let Some(name) = frame.name.as_ref() {
101                        name.as_str()
102                    } else {
103                        return true;
104                    };
105
106                    name.starts_with(f)
107                })
108            });
109        }))
110        .install();
111
112    if is_ci {
113        let info_log_file = std::env::var("INFO_LOG_FILE").ok();
114        if let Some(file_path) = info_log_file {
115            try_set_global_subscriber(env_subscriber_with_file(&file_path)?);
116            tracing::trace!("Using file logging in CI, outputting to {}", file_path);
117        } else {
118            try_set_global_subscriber(env_subscriber_to_human(default_level));
119            tracing::trace!("Using console logging in CI");
120            tracing::warn!(
121                "CI is set but INFO_LOG_FILE is missing, only console logs will be used."
122            );
123        }
124    } else if is_json_logs() {
125        try_set_global_subscriber(env_subscriber_to_json(default_level));
126        tracing::trace!("Using JSON logging");
127    } else {
128        try_set_global_subscriber(env_subscriber_to_human(default_level));
129        tracing::trace!("Using human-readable logging");
130    }
131
132    tracing::info!("Tracing initialized successfully.");
133    Ok(())
134}
135
136pub fn initialize_telemetry(config: &TelemetryConfig) -> Result<(), BridgeError> {
137    let telemetry_addr: SocketAddr = format!("{}:{}", config.host, config.port)
138        .parse()
139        .unwrap_or_else(|_| {
140            tracing::warn!(
141                "Invalid telemetry address: {}:{}, using default address: 127.0.0.1:8081",
142                config.host,
143                config.port
144            );
145            SocketAddr::from((Ipv4Addr::new(127, 0, 0, 1), 8081))
146        });
147
148    tracing::debug!("Initializing telemetry at {}", telemetry_addr);
149
150    let builder = PrometheusBuilder::new().with_http_listener(telemetry_addr);
151
152    builder
153        .install()
154        .map_err(|e| eyre::eyre!("Failed to initialize telemetry: {}", e))?;
155
156    Ok(())
157}
158
159fn try_set_global_subscriber<S>(subscriber: S)
160where
161    S: Subscriber + Send + Sync + 'static,
162{
163    match tracing::subscriber::set_global_default(subscriber) {
164        Ok(_) => {}
165        // Statically, the only error possible is "already initialized"
166        Err(_) => {
167            #[cfg(test)]
168            tracing::trace!("Tracing is already initialized, skipping without errors...");
169            #[cfg(not(test))]
170            tracing::info!(
171                "Unexpected double initialization of tracing, skipping without errors..."
172            );
173        }
174    }
175}
176
177fn env_subscriber_with_file(path: &str) -> Result<Box<dyn Subscriber + Send + Sync>, BridgeError> {
178    if let Some(parent_dir) = std::path::Path::new(path).parent() {
179        std::fs::create_dir_all(parent_dir).map_err(|e| {
180            BridgeError::ConfigError(format!(
181                "Failed to create log directory '{}': {}",
182                parent_dir.display(),
183                e
184            ))
185        })?;
186    }
187
188    let file = File::create(path).map_err(|e| BridgeError::ConfigError(e.to_string()))?;
189
190    let file_filter = EnvFilter::from_default_env()
191        .add_directive("info".parse().expect("It should parse info level"))
192        .add_directive("ci=debug".parse().expect("It should parse ci debug level"));
193
194    let console_filter = EnvFilter::builder()
195        .with_default_directive(LevelFilter::WARN.into())
196        .from_env_lossy();
197
198    let file_layer = fmt::layer()
199        .with_writer(file)
200        .with_ansi(false)
201        .with_file(true)
202        .with_line_number(true)
203        .with_target(true)
204        .with_thread_ids(true)
205        .with_thread_names(true)
206        .with_filter(file_filter)
207        .boxed();
208
209    let console_layer = fmt::layer()
210        .with_test_writer()
211        .with_file(true)
212        .with_line_number(true)
213        .with_target(true)
214        .with_filter(console_filter)
215        .boxed();
216
217    Ok(Box::new(
218        Registry::default().with(file_layer).with(console_layer),
219    ))
220}
221
222fn env_subscriber_to_json(level: Option<LevelFilter>) -> Box<dyn Subscriber + Send + Sync> {
223    let filter = match level {
224        Some(lvl) => EnvFilter::builder()
225            .with_default_directive(lvl.into())
226            .from_env_lossy(),
227        None => EnvFilter::from_default_env(),
228    };
229
230    let json_layer = fmt::layer::<Registry>()
231        .with_test_writer()
232        // .with_timer(time::UtcTime::rfc_3339())
233        .with_file(true)
234        .with_line_number(true)
235        .with_thread_ids(true)
236        .with_thread_names(true)
237        .with_target(true)
238        .json();
239    // .with_current_span(true)z
240    // .with_span_list(true)
241    // To see how long each span takes, uncomment this.
242    // .with_span_events(FmtSpan::CLOSE)
243
244    Box::new(tracing_subscriber::registry().with(json_layer).with(filter))
245}
246
247fn env_subscriber_to_human(level: Option<LevelFilter>) -> Box<dyn Subscriber + Send + Sync> {
248    let filter = match level {
249        Some(lvl) => EnvFilter::builder()
250            .with_default_directive(lvl.into())
251            .from_env_lossy(),
252        None => EnvFilter::from_default_env(),
253    };
254
255    let standard_layer = fmt::layer()
256        .with_test_writer()
257        // .with_timer(time::UtcTime::rfc_3339())
258        .with_file(true)
259        .with_line_number(true)
260        // To see how long each span takes, uncomment this.
261        // .with_span_events(FmtSpan::CLOSE)
262        .with_target(true);
263
264    Box::new(
265        tracing_subscriber::registry()
266            .with(standard_layer)
267            .with(filter),
268    )
269}
270
271fn is_json_logs() -> bool {
272    std::env::var("LOG_FORMAT")
273        .map(|v| v.eq_ignore_ascii_case("json"))
274        .unwrap_or(false)
275}
276
277pub fn get_vergen_response() -> VergenResponse {
278    let mut vergen_response = String::new();
279
280    // build info
281    if let Some(date) = option_env!("VERGEN_BUILD_DATE") {
282        vergen_response.push_str(&format!("Build Date: {date}\n"));
283    }
284    if let Some(timestamp) = option_env!("VERGEN_BUILD_TIMESTAMP") {
285        vergen_response.push_str(&format!("Build Timestamp: {timestamp}\n"));
286    }
287
288    // git info
289    if let Some(branch) = option_env!("VERGEN_GIT_BRANCH") {
290        vergen_response.push_str(&format!("git branch: {branch}\n"));
291    }
292    if let Some(commit) = option_env!("VERGEN_GIT_SHA") {
293        vergen_response.push_str(&format!("git commit: {commit}\n"));
294    }
295    if let Some(commit_date) = option_env!("VERGEN_GIT_COMMIT_DATE") {
296        vergen_response.push_str(&format!("git commit date: {commit_date}\n"));
297    }
298    if let Some(commit_timestamp) = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP") {
299        vergen_response.push_str(&format!("git commit timestamp: {commit_timestamp}\n"));
300    }
301    if let Some(commit_author_name) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_NAME") {
302        vergen_response.push_str(&format!("git commit author name: {commit_author_name}\n"));
303    }
304    if let Some(commit_author_email) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_EMAIL") {
305        vergen_response.push_str(&format!("git commit author email: {commit_author_email}\n"));
306    }
307    if let Some(commit_count) = option_env!("VERGEN_GIT_COMMIT_COUNT") {
308        vergen_response.push_str(&format!("git commit count: {commit_count}\n"));
309    }
310    if let Some(commit_message) = option_env!("VERGEN_GIT_COMMIT_MESSAGE") {
311        vergen_response.push_str(&format!("git commit message: {commit_message}\n"));
312    }
313    if let Some(describe) = option_env!("VERGEN_GIT_DESCRIBE") {
314        vergen_response.push_str(&format!("git describe: {describe}\n"));
315    }
316    if let Some(dirty) = option_env!("VERGEN_GIT_DIRTY") {
317        vergen_response.push_str(&format!("git dirty: {dirty}\n"));
318    }
319
320    // cargo info
321    if let Some(debug) = option_env!("VERGEN_CARGO_DEBUG") {
322        vergen_response.push_str(&format!("cargo debug: {debug}\n"));
323    }
324    if let Some(opt_level) = option_env!("VERGEN_CARGO_OPT_LEVEL") {
325        vergen_response.push_str(&format!("cargo opt level: {opt_level}\n"));
326    }
327    if let Some(target_triple) = option_env!("VERGEN_CARGO_TARGET_TRIPLE") {
328        vergen_response.push_str(&format!("cargo target triple: {target_triple}\n"));
329    }
330    if let Some(features) = option_env!("VERGEN_CARGO_FEATURES") {
331        vergen_response.push_str(&format!("cargo features: {features}\n"));
332    }
333    if let Some(dependencies) = option_env!("VERGEN_CARGO_DEPENDENCIES") {
334        vergen_response.push_str(&format!("cargo dependencies: {dependencies}\n"));
335    }
336
337    // rustc info
338    if let Some(channel) = option_env!("VERGEN_RUSTC_CHANNEL") {
339        vergen_response.push_str(&format!("rustc channel: {channel}\n"));
340    }
341    if let Some(version) = option_env!("VERGEN_RUSTC_SEMVER") {
342        vergen_response.push_str(&format!("rustc version: {version}\n"));
343    }
344    if let Some(commit_hash) = option_env!("VERGEN_RUSTC_COMMIT_HASH") {
345        vergen_response.push_str(&format!("rustc commit hash: {commit_hash}\n"));
346    }
347    if let Some(commit_date) = option_env!("VERGEN_RUSTC_COMMIT_DATE") {
348        vergen_response.push_str(&format!("rustc commit date: {commit_date}\n"));
349    }
350    if let Some(host_triple) = option_env!("VERGEN_RUSTC_HOST_TRIPLE") {
351        vergen_response.push_str(&format!("rustc host triple: {host_triple}\n"));
352    }
353    if let Some(llvm_version) = option_env!("VERGEN_RUSTC_LLVM_VERSION") {
354        vergen_response.push_str(&format!("rustc LLVM version: {llvm_version}\n"));
355    }
356
357    // sysinfo
358    if let Some(cpu_brand) = option_env!("VERGEN_SYSINFO_CPU_BRAND") {
359        vergen_response.push_str(&format!("cpu brand: {cpu_brand}\n"));
360    }
361    if let Some(cpu_name) = option_env!("VERGEN_SYSINFO_CPU_NAME") {
362        vergen_response.push_str(&format!("cpu name: {cpu_name}\n"));
363    }
364    if let Some(cpu_vendor) = option_env!("VERGEN_SYSINFO_CPU_VENDOR") {
365        vergen_response.push_str(&format!("cpu vendor: {cpu_vendor}\n"));
366    }
367    if let Some(cpu_core_count) = option_env!("VERGEN_SYSINFO_CPU_CORE_COUNT") {
368        vergen_response.push_str(&format!("cpu core count: {cpu_core_count}\n"));
369    }
370    if let Some(cpu_frequency) = option_env!("VERGEN_SYSINFO_CPU_FREQUENCY") {
371        vergen_response.push_str(&format!("cpu frequency: {cpu_frequency} MHz\n"));
372    }
373    if let Some(memory) = option_env!("VERGEN_SYSINFO_TOTAL_MEMORY") {
374        vergen_response.push_str(&format!("total memory: {memory}\n"));
375    }
376    if let Some(name) = option_env!("VERGEN_SYSINFO_NAME") {
377        vergen_response.push_str(&format!("system name: {name}\n"));
378    }
379    if let Some(os_version) = option_env!("VERGEN_SYSINFO_OS_VERSION") {
380        vergen_response.push_str(&format!("OS version: {os_version}\n"));
381    }
382    if let Some(user) = option_env!("VERGEN_SYSINFO_USER") {
383        vergen_response.push_str(&format!("build user: {user}\n"));
384    }
385
386    VergenResponse {
387        response: vergen_response,
388    }
389}
390
391/// Monitors a [`tokio::task::JoinHandle`] in the background and logs it's end
392/// result.
393pub fn monitor_standalone_task<
394    T: Send + 'static,
395    E: Debug + Send + 'static + From<BridgeError>,
396    C: Send + 'static,
397>(
398    task_handle: tokio::task::JoinHandle<Result<T, E>>,
399    task_name: &str,
400    monitor_err_sender: tokio::sync::mpsc::Sender<Result<C, E>>,
401) {
402    let task_name = task_name.to_string();
403
404    // Move task_handle into the spawned task to make it Send
405    tokio::spawn(async move {
406        match task_handle.await {
407            Ok(Ok(_)) => {
408                tracing::debug!("Task {} completed successfully", task_name);
409            }
410            Ok(Err(e)) => {
411                tracing::error!("Task {} threw an error: {:?}", task_name, e);
412                let _ = monitor_err_sender.send(Err(e)).await.inspect_err(|e| {
413                    tracing::error!("Failed to send error to monitoring channel: {:?}", e)
414                });
415            }
416            Err(e) => {
417                if e.is_cancelled() {
418                    // Task was cancelled, which is expected during cleanup
419                    tracing::debug!("Task {} has been cancelled", task_name);
420                    let _ = monitor_err_sender
421                        .send(Err(Into::<BridgeError>::into(eyre::eyre!(
422                            "Task was cancelled due to: {:?}",
423                            e
424                        ))
425                        .into()))
426                        .await
427                        .inspect_err(|e| {
428                            tracing::error!("Failed to send error to monitoring channel: {:?}", e)
429                        });
430                    return;
431                }
432                tracing::error!("Task {} has panicked: {:?}", task_name, e);
433                let _ = monitor_err_sender
434                    .send(Err(Into::<BridgeError>::into(eyre::eyre!(
435                        "Task has panicked due to: {:?}",
436                        e
437                    ))
438                    .into()))
439                    .await
440                    .inspect_err(|e| {
441                        tracing::error!("Failed to send error to monitoring channel: {:?}", e)
442                    });
443            }
444        }
445    });
446}
447
448/// Delays the exit of the program for 15 seconds, to allow for logs to be flushed.
449/// Then panics with the given arguments.
450///
451/// # Parameters
452///
453/// - `($($arg:tt)*)`: Arguments to pass to `panic!`, in the same manner as format! and println!
454macro_rules! delayed_panic {
455    ($($arg:tt)*) => {
456        {
457            eprintln!($($arg)*);
458            eprintln!("Delaying exit for 15 seconds, to allow for logs to be flushed");
459            std::thread::sleep(std::time::Duration::from_secs(15));
460            panic!($($arg)*);
461        }
462    };
463}
464
465pub(crate) use delayed_panic;
466
467#[derive(Debug, Clone, Default)]
468pub struct AddMethodMiddlewareLayer;
469
470impl<S> Layer<S> for AddMethodMiddlewareLayer {
471    type Service = AddMethodMiddleware<S>;
472
473    fn layer(&self, service: S) -> Self::Service {
474        AddMethodMiddleware { inner: service }
475    }
476}
477
478#[derive(Debug, Clone)]
479pub struct AddMethodMiddleware<S> {
480    inner: S,
481}
482
483type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
484
485impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for AddMethodMiddleware<S>
486where
487    S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>> + Clone + Send + 'static,
488    S::Future: Send + 'static,
489    ReqBody: Send + 'static,
490{
491    type Response = S::Response;
492    type Error = S::Error;
493    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
494
495    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
496        self.inner.poll_ready(cx)
497    }
498
499    fn call(&mut self, mut req: http::Request<ReqBody>) -> Self::Future {
500        // See: https://docs.rs/tower/latest/tower/trait.Service.html#be-careful-when-cloning-inner-services
501        let clone = self.inner.clone();
502        let mut inner = std::mem::replace(&mut self.inner, clone);
503
504        Box::pin(async move {
505            let path = req.uri().path();
506
507            let grpc_method =
508                if let &[_, _, method] = &path.split("/").collect::<Vec<&str>>().as_slice() {
509                    Some(method.to_string())
510                } else {
511                    None
512                };
513
514            if let Some(grpc_method) = grpc_method {
515                if let Ok(grpc_method) = HeaderValue::from_str(&grpc_method) {
516                    req.headers_mut().insert("grpc-method", grpc_method);
517                }
518            }
519
520            // Do extra async work here...
521            let response = inner.call(req).await?;
522
523            Ok(response)
524        })
525    }
526}
527
528/// A trait for entities that have a name, operator, verifier, etc.
529/// Used to distinguish between state machines with different owners in the database,
530/// and to provide a human-readable name for the entity for task names.
531pub trait NamedEntity: Sync + Send + 'static {
532    /// A string identifier for this owner type used to distinguish between
533    /// state machines with different owners in the database.
534    ///
535    /// ## Example
536    /// "operator", "verifier", "user"
537    const ENTITY_NAME: &'static str;
538
539    /// Consumer ID for the tx sender task.
540    const TX_SENDER_CONSUMER_ID: &'static str;
541
542    /// Consumer ID for the finalized block task with no automation.
543    const FINALIZED_BLOCK_CONSUMER_ID_NO_AUTOMATION: &'static str;
544
545    /// Consumer ID for the finalized block task with automation.
546    const FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION: &'static str;
547}
548
549#[derive(Copy, Clone, Eq, Hash, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
550pub struct TxMetadata {
551    pub deposit_outpoint: Option<OutPoint>,
552    pub operator_xonly_pk: Option<XOnlyPublicKey>,
553    pub round_idx: Option<RoundIndex>,
554    pub kickoff_idx: Option<u32>,
555    pub tx_type: TransactionType,
556}
557
558impl std::fmt::Debug for TxMetadata {
559    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
560        let mut dbg_struct = f.debug_struct("TxMetadata");
561        if let Some(deposit_outpoint) = self.deposit_outpoint {
562            dbg_struct.field("deposit_outpoint", &deposit_outpoint);
563        }
564        if let Some(operator_xonly_pk) = self.operator_xonly_pk {
565            dbg_struct.field("operator_xonly_pk", &operator_xonly_pk);
566        }
567        if let Some(round_idx) = self.round_idx {
568            dbg_struct.field("round_idx", &round_idx);
569        }
570        if let Some(kickoff_idx) = self.kickoff_idx {
571            dbg_struct.field("kickoff_idx", &kickoff_idx);
572        }
573        dbg_struct.field("tx_type", &self.tx_type);
574        dbg_struct.finish()
575    }
576}
577
578/// Specifies the fee bumping strategy used for a transaction.
579#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord, sqlx::Type)]
580#[sqlx(type_name = "fee_paying_type", rename_all = "lowercase")]
581pub enum FeePayingType {
582    /// Child-Pays-For-Parent: A new "child" transaction is created, spending an output
583    /// from the original "parent" transaction. The child pays a high fee, sufficient
584    /// to cover both its own cost and the parent's fee deficit, incentivizing miners
585    /// to confirm both together. Specifically, we utilize "fee payer" UTXOs.
586    CPFP,
587    /// Replace-By-Fee: The original unconfirmed transaction is replaced with a new
588    /// version that includes a higher fee. The original transaction must signal
589    /// RBF enablement (e.g., via nSequence). Bitcoin Core's `bumpfee` RPC is often used.
590    RBF,
591    /// The transaction has already been funded and no fee is needed.
592    /// Currently used for disprove tx as it has operator's collateral as input.
593    NoFunding,
594}
595
596/// Information to re-sign an RBF transaction.
597/// Specifically the merkle root of the taproot to keyspend with and the output index of the utxo to be
598/// re-signed.
599///
600/// - Not needed for SinglePlusAnyoneCanPay RBF txs.
601/// - Not needed for CPFP.
602/// - Only signs for a keypath spend
603#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
604pub struct RbfSigningInfo {
605    pub vout: u32,
606    pub tweak_merkle_root: Option<TapNodeHash>,
607    #[cfg(test)]
608    pub annex: Option<Vec<u8>>,
609    #[cfg(test)]
610    pub additional_taproot_output_count: Option<u32>,
611}
612pub trait Last20Bytes {
613    fn last_20_bytes(&self) -> [u8; 20];
614}
615
616pub trait TryLast20Bytes {
617    fn try_last_20_bytes(self) -> Result<[u8; 20], BridgeError>;
618}
619
620impl Last20Bytes for [u8; 32] {
621    fn last_20_bytes(&self) -> [u8; 20] {
622        self.try_last_20_bytes().expect("will not happen")
623    }
624}
625
626pub trait ScriptBufExt {
627    fn try_get_taproot_pk(&self) -> Result<XOnlyPublicKey, BridgeError>;
628}
629
630impl ScriptBufExt for ScriptBuf {
631    fn try_get_taproot_pk(&self) -> Result<XOnlyPublicKey, BridgeError> {
632        if !self.is_p2tr() {
633            return Err(eyre::eyre!("Script is not a valid P2TR script (not 34 bytes)").into());
634        }
635
636        Ok(XOnlyPublicKey::from_slice(&self.as_bytes()[2..34])
637            .wrap_err("Failed to parse XOnlyPublicKey from script")?)
638    }
639}
640
641impl TryLast20Bytes for &[u8] {
642    fn try_last_20_bytes(self) -> Result<[u8; 20], BridgeError> {
643        if self.len() < 20 {
644            return Err(eyre::eyre!("Input is too short to contain 20 bytes").into());
645        }
646        let mut result = [0u8; 20];
647
648        result.copy_from_slice(&self[self.len() - 20..]);
649        Ok(result)
650    }
651}
652
653/// Wraps a future with a timeout, returning a `Status::deadline_exceeded` gRPC error
654/// if the future does not complete within the specified duration.
655///
656/// This is useful for enforcing timeouts on individual asynchronous operations,
657/// especially those involving network requests, to prevent them from hanging indefinitely.
658///
659/// # Arguments
660///
661/// * `duration`: The maximum `Duration` to wait for the future to complete.
662/// * `description`: A string slice describing the operation, used in the timeout error message.
663/// * `future`: The `Future` to execute. The future should return a `Result<T, BridgeError>`.
664///
665/// # Returns
666///
667/// Returns `Ok(T)` if the future completes successfully within the time limit.
668/// Returns `Err(BridgeError)` if the future returns an error or if it times out.
669/// A timeout results in a `BridgeError` that wraps a `tonic::Status::deadline_exceeded`.
670pub async fn timed_request<F, T>(
671    duration: Duration,
672    description: &str,
673    future: F,
674) -> Result<T, BridgeError>
675where
676    F: Future<Output = Result<T, BridgeError>>,
677{
678    timed_request_base(duration, description, future)
679        .await
680        .map_err(|_| {
681            Box::new(Status::deadline_exceeded(format!(
682                "{description} timed out"
683            )))
684        })?
685}
686
687/// Wraps a future with a timeout and adds a debug span with the description.
688///
689/// # Arguments
690///
691/// * `duration`: The maximum `Duration` to wait for the future to complete.
692/// * `description`: A string slice describing the operation, used in the timeout error message.
693/// * `future`: The `Future` to execute. The future should return a `Result<T, BridgeError>`.
694///
695/// # Returns
696///
697/// Returns `Ok(Ok(T))` if the future completes successfully within the time limit, returns `Ok(Err(e))`
698/// if the future returns an error, returns `Err(Elapsed)` if the request times out.
699pub async fn timed_request_base<F, T>(
700    duration: Duration,
701    description: &str,
702    future: F,
703) -> Result<Result<T, BridgeError>, Elapsed>
704where
705    F: Future<Output = Result<T, BridgeError>>,
706{
707    timeout(duration, future)
708        .instrument(debug_span!("timed_request", description = description))
709        .await
710}
711
712/// Concurrently executes a collection of futures, applying a timeout to each one individually.
713/// If any future fails or times out, the entire operation is aborted and an error is returned.
714///
715/// This utility is an extension of `futures::future::try_join_all` with added per-future
716/// timeout logic and improved error reporting using optional IDs.
717///
718/// # Type Parameters
719///
720/// * `I`: An iterator that yields futures.
721/// * `T`: The success type of the futures.
722/// * `D`: A type that can be displayed, used for identifying futures in error messages.
723///
724/// # Arguments
725///
726/// * `duration`: The timeout `Duration` applied to each individual future in the iterator.
727/// * `description`: A string slice describing the collective operation, used in timeout error messages.
728/// * `ids`: An optional `Vec<D>` of identifiers corresponding to each future. If provided,
729///   these IDs are used in error messages to specify which future failed or timed out.
730/// * `iter`: An iterator producing the futures to be executed.
731///
732/// # Returns
733///
734/// Returns `Ok(Vec<T>)` containing the results of all futures if they all complete successfully.
735/// Returns `Err(BridgeError)` if any future returns an error or times out. The error will be a combined error of all errors.
736/// The error will be contextualized with the operation description and the specific future's ID if available.
737pub async fn timed_try_join_all<I, T, D>(
738    duration: Duration,
739    description: &str,
740    ids: Option<Vec<D>>,
741    iter: I,
742) -> Result<Vec<T>, BridgeError>
743where
744    D: Display,
745    I: IntoIterator,
746    I::Item: Future<Output = Result<T, BridgeError>>,
747{
748    let ids = Arc::new(ids);
749    let results = join_all(iter.into_iter().enumerate().map(|item| {
750        let ids = ids.clone();
751        async move {
752            let id = Option::as_ref(&ids).and_then(|ids| ids.get(item.0));
753
754            timeout(duration, item.1)
755                .await
756                .map_err(|_| {
757                    Box::new(Status::deadline_exceeded(format!(
758                        "{} (id: {}) timed out",
759                        description,
760                        id.map(|id| id.to_string())
761                            .unwrap_or_else(|| "n/a".to_string())
762                    )))
763                })?
764                // Add the id to the error chain for easier debugging for other errors.
765                .wrap_err_with(|| {
766                    format!(
767                        "Failed to join {}",
768                        id.map(ToString::to_string).unwrap_or_else(|| "n/a".into())
769                    )
770                })
771        }
772    }))
773    .instrument(debug_span!("timed_try_join_all", description = description))
774    .await;
775
776    combine_errors(results)
777}
778
779/// Executes a collection of fallible futures concurrently and aggregates failures into a single
780/// [`BridgeError`].
781///
782/// This runs all futures like [`futures::future::try_join_all`], but instead of stopping on the
783/// first error it waits for every future to finish by leveraging [`join_all`]. All errors are then
784/// combined via [`combine_errors`], ensuring the caller gets full visibility into every failure.
785pub async fn join_all_combine_errors<F, T, E>(
786    futures: impl IntoIterator<Item = F>,
787) -> Result<Vec<T>, BridgeError>
788where
789    F: Future<Output = Result<T, E>>,
790    E: std::fmt::Display,
791{
792    let results = join_all(futures).await;
793    combine_errors(results)
794}
795
796/// Collects errors from an iterator of results and returns a combined error if any failed.
797///
798/// # Parameters
799/// * `results`: Iterator of results (errors should contain identifying information in their Debug representation)
800/// * `prefix`: Prefix message for the combined error (e.g., "Operator key collection failures")
801///
802/// # Returns
803/// * `Ok(Vec<T>)` containing all successful results if all results are successful
804/// * `Err(BridgeError)` with a combined error message listing all failures
805pub fn combine_errors<I, EIn, T>(results: I) -> Result<Vec<T>, BridgeError>
806where
807    I: IntoIterator<Item = Result<T, EIn>>,
808    EIn: std::fmt::Display,
809{
810    let mut errors = Vec::new();
811    let mut successful_results = Vec::new();
812    for result in results {
813        match result {
814            Ok(value) => successful_results.push(value),
815            Err(e) => errors.push(format!("{e:#}")),
816        }
817    }
818    if !errors.is_empty() {
819        return Err(BridgeError::from(eyre::eyre!(
820            "Number of failed futures: {}: {}",
821            errors.len(),
822            errors.join("; ")
823        )));
824    }
825    Ok(successful_results)
826}
827
828/// Collects all errors (both outer and inner) from named task results and returns a combined error if any task failed.
829///
830/// This function is useful when you have multiple async tasks (e.g., from `tokio::spawn`) and want to
831/// see all errors if multiple tasks fail, rather than just the first error.
832///
833/// # Parameters
834/// * `task_results`: Iterator of tuples containing (task_name, Result<Result<T, E1>, E2>)
835///   - `task_name`: A string-like identifier for the task (used in error messages)
836///   - The nested Result represents: `Result<T, E1>` is the task's result, `E2` is typically a `JoinError`
837///
838/// # Returns
839/// * `Ok(())` if all tasks completed successfully
840/// * `Err(BridgeError)` with a combined error message listing all failures
841pub fn flatten_join_named_results<T, E1, E2, S, R>(task_results: R) -> Result<(), BridgeError>
842where
843    R: IntoIterator<Item = (S, Result<Result<T, E1>, E2>)>,
844    S: AsRef<str>,
845    E1: std::fmt::Display,
846    E2: std::fmt::Display,
847{
848    let mut task_errors = Vec::new();
849
850    for (task_name, task_output) in task_results.into_iter() {
851        match task_output {
852            Ok(inner_result) => {
853                if let Err(e) = inner_result {
854                    let err_msg = format!("{} failed with error: {:#}", task_name.as_ref(), e);
855                    task_errors.push(err_msg);
856                }
857            }
858            Err(e) => {
859                let err_msg = format!(
860                    "{} task thread failed with error: {:#}",
861                    task_name.as_ref(),
862                    e
863                );
864                task_errors.push(err_msg);
865            }
866        }
867    }
868
869    if !task_errors.is_empty() {
870        tracing::error!("Tasks failed with errors: {:#?}", task_errors);
871        return Err(eyre::eyre!("Tasks failed with errors: {:#?}", task_errors).into());
872    }
873
874    Ok(())
875}
876
877#[cfg(test)]
878mod tests {
879    use super::*;
880    use std::fs;
881    use std::io::Read;
882    use tempfile::NamedTempFile;
883    use tracing::level_filters::LevelFilter;
884
885    #[test]
886    #[ignore = "This test changes environment variables so it should not be run in CI since it might affect other tests."]
887    fn test_ci_logging_setup() {
888        let temp_file = NamedTempFile::new().expect("Failed to create temp file");
889        let temp_path = temp_file.path().to_string_lossy().to_string();
890
891        std::env::set_var("CI", "true");
892        std::env::set_var("INFO_LOG_FILE", &temp_path);
893
894        let result = initialize_logger(Some(LevelFilter::DEBUG));
895        assert!(result.is_ok(), "Logger initialization should succeed");
896
897        tracing::error!("Test error message");
898        tracing::warn!("Test warn message");
899        tracing::info!("Test info message");
900        tracing::debug!(target: "ci", "Test CI debug message");
901        tracing::debug!("Test debug message");
902
903        std::thread::sleep(std::time::Duration::from_millis(100));
904
905        let mut file_contents = String::new();
906        let mut file = fs::File::open(&temp_path).expect("Failed to open log file");
907        file.read_to_string(&mut file_contents)
908            .expect("Failed to read log file");
909
910        assert!(
911            file_contents.contains("Test error message"),
912            "Error message should be in file"
913        );
914        assert!(
915            file_contents.contains("Test warn message"),
916            "Warn message should be in file"
917        );
918        assert!(
919            file_contents.contains("Test info message"),
920            "Info message should be in file"
921        );
922
923        assert!(
924            file_contents.contains("Test CI debug message"),
925            "Debug message for CI should be in file"
926        );
927
928        assert!(
929            !file_contents.contains("Test debug message"),
930            "Debug message should not be in file"
931        );
932
933        std::env::remove_var("CI");
934        std::env::remove_var("INFO_LOG_FILE");
935    }
936}