use crate::builder::transaction::TransactionType;
use crate::config::TelemetryConfig;
use crate::errors::BridgeError;
use crate::operator::RoundIndex;
use crate::rpc::clementine::VergenResponse;
use bitcoin::{OutPoint, ScriptBuf, TapNodeHash, XOnlyPublicKey};
use eyre::Context as _;
use futures::future::try_join_all;
use http::HeaderValue;
use metrics_exporter_prometheus::PrometheusBuilder;
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display};
use std::fs::File;
use std::future::Future;
use std::net::{Ipv4Addr, SocketAddr};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::error::Elapsed;
use tokio::time::timeout;
use tonic::Status;
use tower::{Layer, Service};
use tracing::level_filters::LevelFilter;
use tracing::{debug_span, Instrument, Subscriber};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{fmt, EnvFilter, Layer as TracingLayer, Registry};
pub fn initialize_logger(default_level: Option<LevelFilter>) -> Result<(), BridgeError> {
let is_ci = std::env::var("CI")
.map(|v| v == "true" || v == "1")
.unwrap_or(false);
if cfg!(test) {
std::env::set_var("RUST_LIB_BACKTRACE", "full");
std::env::set_var("RUST_BACKTRACE", "full");
}
let _ = color_eyre::config::HookBuilder::default()
.add_frame_filter(Box::new(|frames| {
let filters = &[
"std::",
"test::",
"tokio::",
"core::",
"<core::",
"<alloc::",
"start_thread",
"clone",
];
frames.retain(|frame| {
!filters.iter().any(|f| {
let name = if let Some(name) = frame.name.as_ref() {
name.as_str()
} else {
return true;
};
name.starts_with(f)
})
});
}))
.install();
if is_ci {
let info_log_file = std::env::var("INFO_LOG_FILE").ok();
if let Some(file_path) = info_log_file {
try_set_global_subscriber(env_subscriber_with_file(&file_path)?);
tracing::trace!("Using file logging in CI, outputting to {}", file_path);
} else {
try_set_global_subscriber(env_subscriber_to_human(default_level));
tracing::trace!("Using console logging in CI");
tracing::warn!(
"CI is set but INFO_LOG_FILE is missing, only console logs will be used."
);
}
} else if is_json_logs() {
try_set_global_subscriber(env_subscriber_to_json(default_level));
tracing::trace!("Using JSON logging");
} else {
try_set_global_subscriber(env_subscriber_to_human(default_level));
tracing::trace!("Using human-readable logging");
}
tracing::info!("Tracing initialized successfully.");
Ok(())
}
pub fn initialize_telemetry(config: &TelemetryConfig) -> Result<(), BridgeError> {
let telemetry_addr: SocketAddr = format!("{}:{}", config.host, config.port)
.parse()
.unwrap_or_else(|_| {
tracing::warn!(
"Invalid telemetry address: {}:{}, using default address: 127.0.0.1:8081",
config.host,
config.port
);
SocketAddr::from((Ipv4Addr::new(0, 0, 0, 0), 8081))
});
tracing::debug!("Initializing telemetry at {}", telemetry_addr);
let builder = PrometheusBuilder::new().with_http_listener(telemetry_addr);
builder
.install()
.map_err(|e| eyre::eyre!("Failed to initialize telemetry: {}", e))?;
Ok(())
}
fn try_set_global_subscriber<S>(subscriber: S)
where
S: Subscriber + Send + Sync + 'static,
{
match tracing::subscriber::set_global_default(subscriber) {
Ok(_) => {}
Err(_) => {
#[cfg(test)]
tracing::trace!("Tracing is already initialized, skipping without errors...");
#[cfg(not(test))]
tracing::info!(
"Unexpected double initialization of tracing, skipping without errors..."
);
}
}
}
fn env_subscriber_with_file(path: &str) -> Result<Box<dyn Subscriber + Send + Sync>, BridgeError> {
if let Some(parent_dir) = std::path::Path::new(path).parent() {
std::fs::create_dir_all(parent_dir).map_err(|e| {
BridgeError::ConfigError(format!(
"Failed to create log directory '{}': {}",
parent_dir.display(),
e
))
})?;
}
let file = File::create(path).map_err(|e| BridgeError::ConfigError(e.to_string()))?;
let file_filter = EnvFilter::from_default_env()
.add_directive("info".parse().expect("It should parse info level"))
.add_directive("ci=debug".parse().expect("It should parse ci debug level"));
let console_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::WARN.into())
.from_env_lossy();
let file_layer = fmt::layer()
.with_writer(file)
.with_ansi(false)
.with_file(true)
.with_line_number(true)
.with_target(true)
.with_thread_ids(true)
.with_thread_names(true)
.with_filter(file_filter)
.boxed();
let console_layer = fmt::layer()
.with_test_writer()
.with_file(true)
.with_line_number(true)
.with_target(true)
.with_filter(console_filter)
.boxed();
Ok(Box::new(
Registry::default().with(file_layer).with(console_layer),
))
}
fn env_subscriber_to_json(level: Option<LevelFilter>) -> Box<dyn Subscriber + Send + Sync> {
let filter = match level {
Some(lvl) => EnvFilter::builder()
.with_default_directive(lvl.into())
.from_env_lossy(),
None => EnvFilter::from_default_env(),
};
let json_layer = fmt::layer::<Registry>()
.with_test_writer()
.with_file(true)
.with_line_number(true)
.with_thread_ids(true)
.with_thread_names(true)
.with_target(true)
.json();
Box::new(tracing_subscriber::registry().with(json_layer).with(filter))
}
fn env_subscriber_to_human(level: Option<LevelFilter>) -> Box<dyn Subscriber + Send + Sync> {
let filter = match level {
Some(lvl) => EnvFilter::builder()
.with_default_directive(lvl.into())
.from_env_lossy(),
None => EnvFilter::from_default_env(),
};
let standard_layer = fmt::layer()
.with_test_writer()
.with_file(true)
.with_line_number(true)
.with_target(true);
Box::new(
tracing_subscriber::registry()
.with(standard_layer)
.with(filter),
)
}
fn is_json_logs() -> bool {
std::env::var("LOG_FORMAT")
.map(|v| v.eq_ignore_ascii_case("json"))
.unwrap_or(false)
}
pub fn get_vergen_response() -> VergenResponse {
let mut vergen_response = String::new();
if let Some(date) = option_env!("VERGEN_BUILD_DATE") {
vergen_response.push_str(&format!("Build Date: {date}\n"));
}
if let Some(timestamp) = option_env!("VERGEN_BUILD_TIMESTAMP") {
vergen_response.push_str(&format!("Build Timestamp: {timestamp}\n"));
}
if let Some(branch) = option_env!("VERGEN_GIT_BRANCH") {
vergen_response.push_str(&format!("git branch: {branch}\n"));
}
if let Some(commit) = option_env!("VERGEN_GIT_SHA") {
vergen_response.push_str(&format!("git commit: {commit}\n"));
}
if let Some(commit_date) = option_env!("VERGEN_GIT_COMMIT_DATE") {
vergen_response.push_str(&format!("git commit date: {commit_date}\n"));
}
if let Some(commit_timestamp) = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP") {
vergen_response.push_str(&format!("git commit timestamp: {commit_timestamp}\n"));
}
if let Some(commit_author_name) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_NAME") {
vergen_response.push_str(&format!("git commit author name: {commit_author_name}\n"));
}
if let Some(commit_author_email) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_EMAIL") {
vergen_response.push_str(&format!("git commit author email: {commit_author_email}\n"));
}
if let Some(commit_count) = option_env!("VERGEN_GIT_COMMIT_COUNT") {
vergen_response.push_str(&format!("git commit count: {commit_count}\n"));
}
if let Some(commit_message) = option_env!("VERGEN_GIT_COMMIT_MESSAGE") {
vergen_response.push_str(&format!("git commit message: {commit_message}\n"));
}
if let Some(describe) = option_env!("VERGEN_GIT_DESCRIBE") {
vergen_response.push_str(&format!("git describe: {describe}\n"));
}
if let Some(dirty) = option_env!("VERGEN_GIT_DIRTY") {
vergen_response.push_str(&format!("git dirty: {dirty}\n"));
}
if let Some(debug) = option_env!("VERGEN_CARGO_DEBUG") {
vergen_response.push_str(&format!("cargo debug: {debug}\n"));
}
if let Some(opt_level) = option_env!("VERGEN_CARGO_OPT_LEVEL") {
vergen_response.push_str(&format!("cargo opt level: {opt_level}\n"));
}
if let Some(target_triple) = option_env!("VERGEN_CARGO_TARGET_TRIPLE") {
vergen_response.push_str(&format!("cargo target triple: {target_triple}\n"));
}
if let Some(features) = option_env!("VERGEN_CARGO_FEATURES") {
vergen_response.push_str(&format!("cargo features: {features}\n"));
}
if let Some(dependencies) = option_env!("VERGEN_CARGO_DEPENDENCIES") {
vergen_response.push_str(&format!("cargo dependencies: {dependencies}\n"));
}
if let Some(channel) = option_env!("VERGEN_RUSTC_CHANNEL") {
vergen_response.push_str(&format!("rustc channel: {channel}\n"));
}
if let Some(version) = option_env!("VERGEN_RUSTC_SEMVER") {
vergen_response.push_str(&format!("rustc version: {version}\n"));
}
if let Some(commit_hash) = option_env!("VERGEN_RUSTC_COMMIT_HASH") {
vergen_response.push_str(&format!("rustc commit hash: {commit_hash}\n"));
}
if let Some(commit_date) = option_env!("VERGEN_RUSTC_COMMIT_DATE") {
vergen_response.push_str(&format!("rustc commit date: {commit_date}\n"));
}
if let Some(host_triple) = option_env!("VERGEN_RUSTC_HOST_TRIPLE") {
vergen_response.push_str(&format!("rustc host triple: {host_triple}\n"));
}
if let Some(llvm_version) = option_env!("VERGEN_RUSTC_LLVM_VERSION") {
vergen_response.push_str(&format!("rustc LLVM version: {llvm_version}\n"));
}
if let Some(cpu_brand) = option_env!("VERGEN_SYSINFO_CPU_BRAND") {
vergen_response.push_str(&format!("cpu brand: {cpu_brand}\n"));
}
if let Some(cpu_name) = option_env!("VERGEN_SYSINFO_CPU_NAME") {
vergen_response.push_str(&format!("cpu name: {cpu_name}\n"));
}
if let Some(cpu_vendor) = option_env!("VERGEN_SYSINFO_CPU_VENDOR") {
vergen_response.push_str(&format!("cpu vendor: {cpu_vendor}\n"));
}
if let Some(cpu_core_count) = option_env!("VERGEN_SYSINFO_CPU_CORE_COUNT") {
vergen_response.push_str(&format!("cpu core count: {cpu_core_count}\n"));
}
if let Some(cpu_frequency) = option_env!("VERGEN_SYSINFO_CPU_FREQUENCY") {
vergen_response.push_str(&format!("cpu frequency: {cpu_frequency} MHz\n"));
}
if let Some(memory) = option_env!("VERGEN_SYSINFO_MEMORY") {
vergen_response.push_str(&format!("total memory: {memory} KB\n"));
}
if let Some(name) = option_env!("VERGEN_SYSINFO_NAME") {
vergen_response.push_str(&format!("system name: {name}\n"));
}
if let Some(os_version) = option_env!("VERGEN_SYSINFO_OS_VERSION") {
vergen_response.push_str(&format!("OS version: {os_version}\n"));
}
if let Some(user) = option_env!("VERGEN_SYSINFO_USER") {
vergen_response.push_str(&format!("build user: {user}\n"));
}
VergenResponse {
response: vergen_response,
}
}
pub fn monitor_standalone_task<T: Send + 'static, E: Debug + Send + 'static>(
task_handle: tokio::task::JoinHandle<Result<T, E>>,
task_name: &str,
) {
let task_name = task_name.to_string();
tokio::spawn(async move {
match task_handle.await {
Ok(Ok(_)) => {
tracing::debug!("Task {} completed successfully", task_name);
}
Ok(Err(e)) => {
tracing::error!("Task {} throw an error: {:?}", task_name, e);
}
Err(e) => {
if e.is_cancelled() {
tracing::debug!("Task {} has cancelled", task_name);
return;
}
tracing::error!("Task {} has panicked: {:?}", task_name, e);
}
}
});
}
macro_rules! delayed_panic {
($($arg:tt)*) => {
{
eprintln!($($arg)*);
eprintln!("Delaying exit for 15 seconds, to allow for logs to be flushed");
std::thread::sleep(std::time::Duration::from_secs(15));
panic!($($arg)*);
}
};
}
pub(crate) use delayed_panic;
#[derive(Debug, Clone, Default)]
pub struct AddMethodMiddlewareLayer;
impl<S> Layer<S> for AddMethodMiddlewareLayer {
type Service = AddMethodMiddleware<S>;
fn layer(&self, service: S) -> Self::Service {
AddMethodMiddleware { inner: service }
}
}
#[derive(Debug, Clone)]
pub struct AddMethodMiddleware<S> {
inner: S,
}
type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for AddMethodMiddleware<S>
where
S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
ReqBody: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, mut req: http::Request<ReqBody>) -> Self::Future {
let clone = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, clone);
Box::pin(async move {
let path = req.uri().path();
let grpc_method =
if let &[_, _, method] = &path.split("/").collect::<Vec<&str>>().as_slice() {
Some(method.to_string())
} else {
None
};
if let Some(grpc_method) = grpc_method {
if let Ok(grpc_method) = HeaderValue::from_str(&grpc_method) {
req.headers_mut().insert("grpc-method", grpc_method);
}
}
let response = inner.call(req).await?;
Ok(response)
})
}
}
pub trait NamedEntity: Sync + Send + 'static {
const ENTITY_NAME: &'static str;
const TX_SENDER_CONSUMER_ID: &'static str;
const FINALIZED_BLOCK_CONSUMER_ID_NO_AUTOMATION: &'static str;
const FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION: &'static str;
}
#[derive(Copy, Clone, Eq, Hash, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct TxMetadata {
pub deposit_outpoint: Option<OutPoint>,
pub operator_xonly_pk: Option<XOnlyPublicKey>,
pub round_idx: Option<RoundIndex>,
pub kickoff_idx: Option<u32>,
pub tx_type: TransactionType,
}
impl std::fmt::Debug for TxMetadata {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut dbg_struct = f.debug_struct("TxMetadata");
if let Some(deposit_outpoint) = self.deposit_outpoint {
dbg_struct.field("deposit_outpoint", &deposit_outpoint);
}
if let Some(operator_xonly_pk) = self.operator_xonly_pk {
dbg_struct.field("operator_xonly_pk", &operator_xonly_pk);
}
if let Some(round_idx) = self.round_idx {
dbg_struct.field("round_idx", &round_idx);
}
if let Some(kickoff_idx) = self.kickoff_idx {
dbg_struct.field("kickoff_idx", &kickoff_idx);
}
dbg_struct.field("tx_type", &self.tx_type);
dbg_struct.finish()
}
}
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord, sqlx::Type)]
#[sqlx(type_name = "fee_paying_type", rename_all = "lowercase")]
pub enum FeePayingType {
CPFP,
RBF,
NoFunding,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct RbfSigningInfo {
pub vout: u32,
pub tweak_merkle_root: Option<TapNodeHash>,
#[cfg(test)]
pub annex: Option<Vec<u8>>,
#[cfg(test)]
pub additional_taproot_output_count: Option<u32>,
}
pub trait Last20Bytes {
fn last_20_bytes(&self) -> [u8; 20];
}
pub trait TryLast20Bytes {
fn try_last_20_bytes(self) -> Result<[u8; 20], BridgeError>;
}
impl Last20Bytes for [u8; 32] {
fn last_20_bytes(&self) -> [u8; 20] {
self.try_last_20_bytes().expect("will not happen")
}
}
pub trait ScriptBufExt {
fn try_get_taproot_pk(&self) -> Result<XOnlyPublicKey, BridgeError>;
}
impl ScriptBufExt for ScriptBuf {
fn try_get_taproot_pk(&self) -> Result<XOnlyPublicKey, BridgeError> {
if !self.is_p2tr() {
return Err(eyre::eyre!("Script is not a valid P2TR script (not 34 bytes)").into());
}
Ok(XOnlyPublicKey::from_slice(&self.as_bytes()[2..34])
.wrap_err("Failed to parse XOnlyPublicKey from script")?)
}
}
impl TryLast20Bytes for &[u8] {
fn try_last_20_bytes(self) -> Result<[u8; 20], BridgeError> {
if self.len() < 20 {
return Err(eyre::eyre!("Input is too short to contain 20 bytes").into());
}
let mut result = [0u8; 20];
result.copy_from_slice(&self[self.len() - 20..]);
Ok(result)
}
}
pub async fn timed_request<F, T>(
duration: Duration,
description: &str,
future: F,
) -> Result<T, BridgeError>
where
F: Future<Output = Result<T, BridgeError>>,
{
timed_request_base(duration, description, future)
.await
.map_err(|_| Status::deadline_exceeded(format!("{} timed out", description)))?
}
pub async fn timed_request_base<F, T>(
duration: Duration,
description: &str,
future: F,
) -> Result<Result<T, BridgeError>, Elapsed>
where
F: Future<Output = Result<T, BridgeError>>,
{
timeout(duration, future)
.instrument(debug_span!("timed_request", description = description))
.await
}
pub async fn timed_try_join_all<I, T, D>(
duration: Duration,
description: &str,
ids: Option<Vec<D>>,
iter: I,
) -> Result<Vec<T>, BridgeError>
where
D: Display,
I: IntoIterator,
I::Item: Future<Output = Result<T, BridgeError>>,
{
let ids = Arc::new(ids);
try_join_all(iter.into_iter().enumerate().map(|item| {
let ids = ids.clone();
async move {
let id = Option::as_ref(&ids).and_then(|ids| ids.get(item.0));
timeout(duration, item.1)
.await
.map_err(|_| {
Status::deadline_exceeded(format!(
"{} (id: {}) timed out",
description,
id.map(|id| id.to_string())
.unwrap_or_else(|| "n/a".to_string())
))
})?
.wrap_err_with(|| {
format!(
"Failed to join {}",
id.map(ToString::to_string).unwrap_or_else(|| "n/a".into())
)
})
.map_err(Into::into)
}
}))
.instrument(debug_span!("timed_try_join_all", description = description))
.await
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::io::Read;
use tempfile::NamedTempFile;
use tracing::level_filters::LevelFilter;
#[test]
#[ignore = "This test changes environment variables so it should not be run in CI since it might affect other tests."]
fn test_ci_logging_setup() {
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let temp_path = temp_file.path().to_string_lossy().to_string();
std::env::set_var("CI", "true");
std::env::set_var("INFO_LOG_FILE", &temp_path);
let result = initialize_logger(Some(LevelFilter::DEBUG));
assert!(result.is_ok(), "Logger initialization should succeed");
tracing::error!("Test error message");
tracing::warn!("Test warn message");
tracing::info!("Test info message");
tracing::debug!(target: "ci", "Test CI debug message");
tracing::debug!("Test debug message");
std::thread::sleep(std::time::Duration::from_millis(100));
let mut file_contents = String::new();
let mut file = fs::File::open(&temp_path).expect("Failed to open log file");
file.read_to_string(&mut file_contents)
.expect("Failed to read log file");
assert!(
file_contents.contains("Test error message"),
"Error message should be in file"
);
assert!(
file_contents.contains("Test warn message"),
"Warn message should be in file"
);
assert!(
file_contents.contains("Test info message"),
"Info message should be in file"
);
assert!(
file_contents.contains("Test CI debug message"),
"Debug message for CI should be in file"
);
assert!(
!file_contents.contains("Test debug message"),
"Debug message should not be in file"
);
std::env::remove_var("CI");
std::env::remove_var("INFO_LOG_FILE");
}
}