use crate::builder::transaction::TransactionType;
use crate::errors::BridgeError;
use crate::operator::RoundIndex;
use crate::rpc::clementine::VergenResponse;
use bitcoin::{OutPoint, TapNodeHash, XOnlyPublicKey};
use eyre::Context as _;
use futures::future::try_join_all;
use http::HeaderValue;
use serde::{Deserialize, Serialize};
use std::env;
use std::fmt::{Debug, Display};
use std::fs::File;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::timeout;
use tonic::Status;
use tower::{Layer, Service};
use tracing::level_filters::LevelFilter;
use tracing::{debug_span, Instrument, Subscriber};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{fmt, EnvFilter, Layer as TracingLayer, Registry};
pub fn initialize_logger(level: Option<LevelFilter>) -> Result<(), BridgeError> {
let is_ci = std::env::var("CI")
.map(|v| v == "true" || v == "1")
.unwrap_or(false);
if is_ci {
let info_log_file = std::env::var("INFO_LOG_FILE").ok();
if let Some(file_path) = info_log_file {
let subscriber = env_subscriber_with_file(&file_path)?;
try_set_global_subscriber(subscriber)?;
} else {
tracing::warn!(
"CI is set but INFO_LOG_FILE is missing, only console logs will be used."
);
let subscriber = env_subscriber_to_human(level);
try_set_global_subscriber(subscriber)?;
}
} else {
let subscriber: Box<dyn Subscriber + Send + Sync> = if is_json_logs() {
Box::new(env_subscriber_to_json(level))
} else {
Box::new(env_subscriber_to_human(level))
};
try_set_global_subscriber(subscriber)?;
}
tracing::info!("Tracing initialized successfully.");
Ok(())
}
fn try_set_global_subscriber<S>(subscriber: S) -> Result<(), BridgeError>
where
S: Subscriber + Send + Sync + 'static,
{
match tracing::subscriber::set_global_default(subscriber) {
Ok(_) => Ok(()),
Err(e) if e.to_string() == "a global default trace dispatcher has already been set" => {
tracing::info!("Tracing is already initialized, skipping without errors...");
Ok(())
}
Err(e) => Err(BridgeError::ConfigError(e.to_string())),
}
}
fn env_subscriber_with_file(path: &str) -> Result<Box<dyn Subscriber + Send + Sync>, BridgeError> {
if let Some(parent_dir) = std::path::Path::new(path).parent() {
std::fs::create_dir_all(parent_dir).map_err(|e| {
BridgeError::ConfigError(format!(
"Failed to create log directory '{}': {}",
parent_dir.display(),
e
))
})?;
}
let file = File::create(path).map_err(|e| BridgeError::ConfigError(e.to_string()))?;
let file_filter = EnvFilter::from_default_env()
.add_directive("info".parse().expect("It should parse info level"))
.add_directive("ci=debug".parse().expect("It should parse ci debug level"));
let console_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::WARN.into())
.from_env_lossy();
let file_layer = fmt::layer()
.with_writer(file)
.with_ansi(false)
.with_file(true)
.with_line_number(true)
.with_target(true)
.with_thread_ids(true)
.with_thread_names(true)
.with_filter(file_filter)
.boxed();
let console_layer = fmt::layer()
.with_test_writer()
.with_file(true)
.with_line_number(true)
.with_target(true)
.with_filter(console_filter)
.boxed();
Ok(Box::new(
Registry::default().with(file_layer).with(console_layer),
))
}
fn env_subscriber_to_json(level: Option<LevelFilter>) -> Box<dyn Subscriber + Send + Sync> {
let filter = match level {
Some(lvl) => EnvFilter::builder()
.with_default_directive(lvl.into())
.from_env_lossy(),
None => EnvFilter::from_default_env(),
};
let json_layer = fmt::layer::<Registry>()
.with_test_writer()
.with_file(true)
.with_line_number(true)
.with_thread_ids(true)
.with_thread_names(true)
.with_target(true)
.json();
Box::new(tracing_subscriber::registry().with(json_layer).with(filter))
}
fn env_subscriber_to_human(level: Option<LevelFilter>) -> Box<dyn Subscriber + Send + Sync> {
let filter = match level {
Some(lvl) => EnvFilter::builder()
.with_default_directive(lvl.into())
.from_env_lossy(),
None => EnvFilter::from_default_env(),
};
let standard_layer = fmt::layer()
.with_test_writer()
.with_file(true)
.with_line_number(true)
.with_target(true);
Box::new(
tracing_subscriber::registry()
.with(standard_layer)
.with(filter),
)
}
fn is_json_logs() -> bool {
std::env::var("LOG_FORMAT")
.map(|v| v.eq_ignore_ascii_case("json"))
.unwrap_or(false)
}
pub fn get_vergen_response() -> VergenResponse {
let mut vergen_response = String::new();
if let Some(date) = option_env!("VERGEN_BUILD_DATE") {
vergen_response.push_str(&format!("Build Date: {date}\n"));
}
if let Some(timestamp) = option_env!("VERGEN_BUILD_TIMESTAMP") {
vergen_response.push_str(&format!("Build Timestamp: {timestamp}\n"));
}
if let Some(branch) = option_env!("VERGEN_GIT_BRANCH") {
vergen_response.push_str(&format!("git branch: {branch}\n"));
}
if let Some(commit) = option_env!("VERGEN_GIT_SHA") {
vergen_response.push_str(&format!("git commit: {commit}\n"));
}
if let Some(commit_date) = option_env!("VERGEN_GIT_COMMIT_DATE") {
vergen_response.push_str(&format!("git commit date: {commit_date}\n"));
}
if let Some(commit_timestamp) = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP") {
vergen_response.push_str(&format!("git commit timestamp: {commit_timestamp}\n"));
}
if let Some(commit_author_name) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_NAME") {
vergen_response.push_str(&format!("git commit author name: {commit_author_name}\n"));
}
if let Some(commit_author_email) = option_env!("VERGEN_GIT_COMMIT_AUTHOR_EMAIL") {
vergen_response.push_str(&format!("git commit author email: {commit_author_email}\n"));
}
if let Some(commit_count) = option_env!("VERGEN_GIT_COMMIT_COUNT") {
vergen_response.push_str(&format!("git commit count: {commit_count}\n"));
}
if let Some(commit_message) = option_env!("VERGEN_GIT_COMMIT_MESSAGE") {
vergen_response.push_str(&format!("git commit message: {commit_message}\n"));
}
if let Some(describe) = option_env!("VERGEN_GIT_DESCRIBE") {
vergen_response.push_str(&format!("git describe: {describe}\n"));
}
if let Some(dirty) = option_env!("VERGEN_GIT_DIRTY") {
vergen_response.push_str(&format!("git dirty: {dirty}\n"));
}
if let Some(debug) = option_env!("VERGEN_CARGO_DEBUG") {
vergen_response.push_str(&format!("cargo debug: {debug}\n"));
}
if let Some(opt_level) = option_env!("VERGEN_CARGO_OPT_LEVEL") {
vergen_response.push_str(&format!("cargo opt level: {opt_level}\n"));
}
if let Some(target_triple) = option_env!("VERGEN_CARGO_TARGET_TRIPLE") {
vergen_response.push_str(&format!("cargo target triple: {target_triple}\n"));
}
if let Some(features) = option_env!("VERGEN_CARGO_FEATURES") {
vergen_response.push_str(&format!("cargo features: {features}\n"));
}
if let Some(dependencies) = option_env!("VERGEN_CARGO_DEPENDENCIES") {
vergen_response.push_str(&format!("cargo dependencies: {dependencies}\n"));
}
if let Some(channel) = option_env!("VERGEN_RUSTC_CHANNEL") {
vergen_response.push_str(&format!("rustc channel: {channel}\n"));
}
if let Some(version) = option_env!("VERGEN_RUSTC_SEMVER") {
vergen_response.push_str(&format!("rustc version: {version}\n"));
}
if let Some(commit_hash) = option_env!("VERGEN_RUSTC_COMMIT_HASH") {
vergen_response.push_str(&format!("rustc commit hash: {commit_hash}\n"));
}
if let Some(commit_date) = option_env!("VERGEN_RUSTC_COMMIT_DATE") {
vergen_response.push_str(&format!("rustc commit date: {commit_date}\n"));
}
if let Some(host_triple) = option_env!("VERGEN_RUSTC_HOST_TRIPLE") {
vergen_response.push_str(&format!("rustc host triple: {host_triple}\n"));
}
if let Some(llvm_version) = option_env!("VERGEN_RUSTC_LLVM_VERSION") {
vergen_response.push_str(&format!("rustc LLVM version: {llvm_version}\n"));
}
if let Some(cpu_brand) = option_env!("VERGEN_SYSINFO_CPU_BRAND") {
vergen_response.push_str(&format!("cpu brand: {cpu_brand}\n"));
}
if let Some(cpu_name) = option_env!("VERGEN_SYSINFO_CPU_NAME") {
vergen_response.push_str(&format!("cpu name: {cpu_name}\n"));
}
if let Some(cpu_vendor) = option_env!("VERGEN_SYSINFO_CPU_VENDOR") {
vergen_response.push_str(&format!("cpu vendor: {cpu_vendor}\n"));
}
if let Some(cpu_core_count) = option_env!("VERGEN_SYSINFO_CPU_CORE_COUNT") {
vergen_response.push_str(&format!("cpu core count: {cpu_core_count}\n"));
}
if let Some(cpu_frequency) = option_env!("VERGEN_SYSINFO_CPU_FREQUENCY") {
vergen_response.push_str(&format!("cpu frequency: {cpu_frequency} MHz\n"));
}
if let Some(memory) = option_env!("VERGEN_SYSINFO_MEMORY") {
vergen_response.push_str(&format!("total memory: {memory} KB\n"));
}
if let Some(name) = option_env!("VERGEN_SYSINFO_NAME") {
vergen_response.push_str(&format!("system name: {name}\n"));
}
if let Some(os_version) = option_env!("VERGEN_SYSINFO_OS_VERSION") {
vergen_response.push_str(&format!("OS version: {os_version}\n"));
}
if let Some(user) = option_env!("VERGEN_SYSINFO_USER") {
vergen_response.push_str(&format!("build user: {user}\n"));
}
VergenResponse {
response: vergen_response,
}
}
pub fn monitor_task_with_panic<T: Send + 'static, E: Debug + Send + 'static>(
task_handle: tokio::task::JoinHandle<Result<T, E>>,
task_name: &str,
) {
let task_name = task_name.to_string();
tokio::spawn(async move {
match task_handle.await {
Ok(Ok(_)) => {
tracing::debug!("Task {} completed successfully", task_name);
}
Ok(Err(e)) => {
tracing::error!("Task {} failed with error: {:?}", task_name, e);
panic!();
}
Err(e) => {
if e.is_cancelled() {
tracing::debug!("Task {} was cancelled", task_name);
return;
}
tracing::error!("Task {} panicked: {:?}", task_name, e);
panic!();
}
}
});
}
macro_rules! delayed_panic {
($($arg:tt)*) => {
{
eprintln!($($arg)*);
eprintln!("Delaying exit for 15 seconds, to allow for logs to be flushed");
std::thread::sleep(std::time::Duration::from_secs(15));
panic!($($arg)*);
}
};
}
pub(crate) use delayed_panic;
#[derive(Debug, Clone, Default)]
pub struct AddMethodMiddlewareLayer;
impl<S> Layer<S> for AddMethodMiddlewareLayer {
type Service = AddMethodMiddleware<S>;
fn layer(&self, service: S) -> Self::Service {
AddMethodMiddleware { inner: service }
}
}
#[derive(Debug, Clone)]
pub struct AddMethodMiddleware<S> {
inner: S,
}
type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for AddMethodMiddleware<S>
where
S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
ReqBody: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, mut req: http::Request<ReqBody>) -> Self::Future {
let clone = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, clone);
Box::pin(async move {
let path = req.uri().path();
let grpc_method =
if let &[_, _, method] = &path.split("/").collect::<Vec<&str>>().as_slice() {
Some(method.to_string())
} else {
None
};
if let Some(grpc_method) = grpc_method {
if let Ok(grpc_method) = HeaderValue::from_str(&grpc_method) {
req.headers_mut().insert("grpc-method", grpc_method);
}
}
let response = inner.call(req).await?;
Ok(response)
})
}
}
pub trait NamedEntity {
const ENTITY_NAME: &'static str;
}
#[derive(Copy, Clone, Eq, Hash, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct TxMetadata {
pub deposit_outpoint: Option<OutPoint>,
pub operator_xonly_pk: Option<XOnlyPublicKey>,
pub round_idx: Option<RoundIndex>,
pub kickoff_idx: Option<u32>,
pub tx_type: TransactionType,
}
impl std::fmt::Debug for TxMetadata {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut dbg_struct = f.debug_struct("TxMetadata");
if let Some(deposit_outpoint) = self.deposit_outpoint {
dbg_struct.field("deposit_outpoint", &deposit_outpoint);
}
if let Some(operator_xonly_pk) = self.operator_xonly_pk {
dbg_struct.field("operator_xonly_pk", &operator_xonly_pk);
}
if let Some(round_idx) = self.round_idx {
dbg_struct.field("round_idx", &round_idx);
}
if let Some(kickoff_idx) = self.kickoff_idx {
dbg_struct.field("kickoff_idx", &kickoff_idx);
}
dbg_struct.field("tx_type", &self.tx_type);
dbg_struct.finish()
}
}
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord, sqlx::Type)]
#[sqlx(type_name = "fee_paying_type", rename_all = "lowercase")]
pub enum FeePayingType {
CPFP,
RBF,
NoFunding,
}
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct RbfSigningInfo {
pub vout: u32,
pub tweak_merkle_root: Option<TapNodeHash>,
#[cfg(test)]
pub annex: Option<Vec<u8>>,
}
pub trait Last20Bytes {
fn last_20_bytes(self) -> [u8; 20];
}
impl Last20Bytes for [u8; 32] {
fn last_20_bytes(self) -> [u8; 20] {
let mut result = [0u8; 20];
result.copy_from_slice(&self[12..32]);
result
}
}
pub async fn timed_request<F, T>(
duration: Duration,
description: &str,
future: F,
) -> Result<T, BridgeError>
where
F: Future<Output = Result<T, BridgeError>>,
{
timeout(duration, future)
.instrument(debug_span!("timed_request", description = description))
.await
.map_err(|_| Status::deadline_exceeded(format!("{} timed out", description)))?
}
pub async fn timed_try_join_all<I, T, D>(
duration: Duration,
description: &str,
ids: Option<Vec<D>>,
iter: I,
) -> Result<Vec<T>, BridgeError>
where
D: Display,
I: IntoIterator,
I::Item: Future<Output = Result<T, BridgeError>>,
{
let ids = Arc::new(ids);
try_join_all(iter.into_iter().enumerate().map(|item| {
let ids = ids.clone();
async move {
let id = Option::as_ref(&ids).map(|ids| ids.get(item.0)).flatten();
timeout(duration, item.1)
.await
.map_err(|_| {
Status::deadline_exceeded(format!(
"{} (id: {}) timed out",
description,
id.map(|id| id.to_string())
.unwrap_or_else(|| "n/a".to_string())
))
})?
.wrap_err_with(|| {
format!(
"Failed to join {}",
id.map(ToString::to_string).unwrap_or_else(|| "n/a".into())
)
})
.map_err(Into::into)
}
}))
.instrument(debug_span!("timed_try_join_all", description = description))
.await
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::io::Read;
use tempfile::NamedTempFile;
use tracing::level_filters::LevelFilter;
#[test]
#[ignore = "This test changes environment variables so it should not be run in CI since it might affect other tests."]
fn test_ci_logging_setup() {
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let temp_path = temp_file.path().to_string_lossy().to_string();
std::env::set_var("CI", "true");
std::env::set_var("INFO_LOG_FILE", &temp_path);
let result = initialize_logger(Some(LevelFilter::DEBUG));
assert!(result.is_ok(), "Logger initialization should succeed");
tracing::error!("Test error message");
tracing::warn!("Test warn message");
tracing::info!("Test info message");
tracing::debug!(target: "ci", "Test CI debug message");
tracing::debug!("Test debug message");
std::thread::sleep(std::time::Duration::from_millis(100));
let mut file_contents = String::new();
let mut file = fs::File::open(&temp_path).expect("Failed to open log file");
file.read_to_string(&mut file_contents)
.expect("Failed to read log file");
assert!(
file_contents.contains("Test error message"),
"Error message should be in file"
);
assert!(
file_contents.contains("Test warn message"),
"Warn message should be in file"
);
assert!(
file_contents.contains("Test info message"),
"Info message should be in file"
);
assert!(
file_contents.contains("Test CI debug message"),
"Debug message for CI should be in file"
);
assert!(
!file_contents.contains("Test debug message"),
"Debug message should not be in file"
);
std::env::remove_var("CI");
std::env::remove_var("INFO_LOG_FILE");
}
}