clementine_core/
servers.rs

1//! # Servers
2//!
3//! Utilities for operator and verifier servers.
4use crate::aggregator::AggregatorServer;
5use crate::citrea::CitreaClientT;
6use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
7use crate::operator::OperatorServer;
8use crate::rpc::clementine::clementine_aggregator_server::ClementineAggregatorServer;
9use crate::rpc::clementine::clementine_operator_server::ClementineOperatorServer;
10use crate::rpc::clementine::clementine_verifier_server::ClementineVerifierServer;
11use crate::rpc::interceptors::Interceptors::{Noop, OnlyAggregatorAndSelf};
12use crate::utils::AddMethodMiddlewareLayer;
13use crate::verifier::VerifierServer;
14use crate::{config::BridgeConfig, errors};
15use errors::BridgeError;
16use eyre::Context;
17use rustls_pki_types::pem::PemObject;
18use std::time::Duration;
19use tokio::sync::oneshot;
20use tonic::server::NamedService;
21use tonic::service::interceptor::InterceptedService;
22use tonic::transport::{Certificate, CertificateDer, Identity, ServerTlsConfig};
23use tower::buffer::BufferLayer;
24use tower::limit::RateLimitLayer;
25
26#[cfg(test)]
27use crate::test::common::ensure_test_certificates;
28
29pub type ServerFuture = dyn futures::Future<Output = Result<(), tonic::transport::Error>>;
30
31/// Represents a network address that can be either TCP or Unix socket
32#[derive(Debug, Clone)]
33pub enum ServerAddr {
34    Tcp(std::net::SocketAddr),
35    #[cfg(unix)]
36    Unix(std::path::PathBuf),
37}
38
39impl From<std::net::SocketAddr> for ServerAddr {
40    fn from(addr: std::net::SocketAddr) -> Self {
41        ServerAddr::Tcp(addr)
42    }
43}
44
45#[cfg(unix)]
46impl From<std::path::PathBuf> for ServerAddr {
47    fn from(path: std::path::PathBuf) -> Self {
48        ServerAddr::Unix(path)
49    }
50}
51
52/// Generic function to create a gRPC server with the given service
53pub async fn create_grpc_server<S>(
54    addr: ServerAddr,
55    service: S,
56    server_name: &str,
57    config: &BridgeConfig,
58) -> Result<(ServerAddr, oneshot::Sender<()>), BridgeError>
59where
60    S: tower::Service<
61            http::Request<tonic::body::BoxBody>,
62            Response = http::Response<tonic::body::BoxBody>,
63            Error = std::convert::Infallible,
64        > + Clone
65        + Send
66        + NamedService
67        + 'static,
68    S::Future: Send + 'static,
69{
70    // Create channels for server readiness and shutdown
71    let (ready_tx, ready_rx) = oneshot::channel();
72    let (shutdown_tx, shutdown_rx) = oneshot::channel();
73
74    // Ensure certificates exist in test mode
75    #[cfg(test)]
76    {
77        ensure_test_certificates().wrap_err("Failed to ensure test certificates")?;
78    }
79
80    match addr {
81        ServerAddr::Tcp(socket_addr) => {
82            let cert = tokio::fs::read(&config.server_cert_path)
83                .await
84                .wrap_err(format!(
85                    "Failed to read server certificate from {}",
86                    config.server_cert_path.display()
87                ))?;
88            let key = tokio::fs::read(&config.server_key_path)
89                .await
90                .wrap_err(format!(
91                    "Failed to read server key from {}",
92                    config.server_key_path.display()
93                ))?;
94
95            let server_identity = Identity::from_pem(cert, key);
96
97            // Load CA certificate for client verification
98            let client_ca_cert = tokio::fs::read(&config.ca_cert_path)
99                .await
100                .wrap_err(format!(
101                    "Failed to read CA certificate from {}",
102                    config.ca_cert_path.display()
103                ))?;
104
105            let client_ca = Certificate::from_pem(client_ca_cert);
106
107            // Build TLS configuration
108            let tls_config = if config.client_verification {
109                ServerTlsConfig::new()
110                    .identity(server_identity)
111                    .client_ca_root(client_ca)
112            } else {
113                ServerTlsConfig::new().identity(server_identity)
114            };
115
116            let service = InterceptedService::new(
117                service,
118                if config.client_verification {
119                    let client_cert = CertificateDer::from_pem_file(&config.client_cert_path)
120                        .wrap_err(format!(
121                            "Failed to read client certificate from {}",
122                            config.client_cert_path.display()
123                        ))?
124                        .to_owned();
125
126                    let aggregator_cert =
127                        CertificateDer::from_pem_file(&config.aggregator_cert_path)
128                            .wrap_err(format!(
129                                "Failed to read aggregator certificate from {}",
130                                config.aggregator_cert_path.display()
131                            ))?
132                            .to_owned();
133
134                    OnlyAggregatorAndSelf {
135                        aggregator_cert,
136                        our_cert: client_cert,
137                    }
138                } else {
139                    Noop
140                },
141            );
142
143            tracing::info!(
144                "Starting {} gRPC server with TCP address: {}",
145                server_name,
146                socket_addr
147            );
148
149            let server_builder = tonic::transport::Server::builder()
150                .layer(AddMethodMiddlewareLayer)
151                .layer(BufferLayer::new(config.grpc.req_concurrency_limit))
152                .layer(RateLimitLayer::new(
153                    config.grpc.ratelimit_req_count as u64,
154                    Duration::from_secs(config.grpc.ratelimit_req_interval_secs),
155                ))
156                .timeout(Duration::from_secs(config.grpc.timeout_secs))
157                .tcp_keepalive(Some(Duration::from_secs(config.grpc.tcp_keepalive_secs)))
158                .concurrency_limit_per_connection(config.grpc.req_concurrency_limit)
159                .http2_adaptive_window(Some(true))
160                .tls_config(tls_config)
161                .wrap_err("Failed to configure TLS")?
162                .add_service(service);
163
164            let server_name_str = server_name.to_string();
165
166            let handle = server_builder.serve_with_shutdown(socket_addr, async move {
167                let _ = ready_tx.send(());
168                shutdown_rx.await.ok();
169                tracing::info!("{} gRPC server shutting down", server_name_str);
170            });
171
172            let server_name_str = server_name.to_string();
173
174            tokio::spawn(async move {
175                if let Err(e) = handle.await {
176                    tracing::error!("{} gRPC server error: {:?}", server_name_str, e);
177                }
178            });
179        }
180        #[cfg(unix)]
181        ServerAddr::Unix(ref socket_path) => {
182            let server_builder = tonic::transport::Server::builder()
183                .layer(AddMethodMiddlewareLayer)
184                .layer(BufferLayer::new(config.grpc.req_concurrency_limit))
185                .layer(RateLimitLayer::new(
186                    config.grpc.ratelimit_req_count as u64,
187                    Duration::from_secs(config.grpc.ratelimit_req_interval_secs),
188                ))
189                .timeout(Duration::from_secs(config.grpc.timeout_secs))
190                .concurrency_limit_per_connection(config.grpc.req_concurrency_limit)
191                .add_service(service);
192            tracing::info!(
193                "Starting {} gRPC server with Unix socket: {:?}",
194                server_name,
195                socket_path
196            );
197
198            // Remove socket file if it already exists
199            if socket_path.exists() {
200                std::fs::remove_file(socket_path)
201                    .wrap_err("Failed to remove existing gRPC unix socket file")?;
202            }
203
204            // Create Unix socket listener
205            let uds = tokio::net::UnixListener::bind(socket_path)
206                .wrap_err("Failed to bind to Unix socket")?;
207            let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
208
209            let server_name_str = server_name.to_string();
210
211            let handle = server_builder.serve_with_incoming_shutdown(incoming, async move {
212                let _ = ready_tx.send(());
213                shutdown_rx.await.ok();
214                tracing::info!("{} gRPC server shutting down", server_name_str);
215            });
216
217            let server_name_str = server_name.to_string();
218
219            tokio::spawn(async move {
220                if let Err(e) = handle.await {
221                    tracing::error!("{} gRPC server error: {:?}", server_name_str, e);
222                }
223            });
224        }
225    }
226
227    // Wait for server to be ready
228    let _ = ready_rx.await;
229    tracing::info!("{} gRPC server started", server_name);
230
231    Ok((addr, shutdown_tx))
232}
233
234pub async fn create_verifier_grpc_server<C: CitreaClientT>(
235    config: BridgeConfig,
236) -> Result<(std::net::SocketAddr, oneshot::Sender<()>), BridgeError> {
237    let _rpc = ExtendedBitcoinRpc::connect(
238        config.bitcoin_rpc_url.clone(),
239        config.bitcoin_rpc_user.clone(),
240        config.bitcoin_rpc_password.clone(),
241        None,
242    )
243    .await
244    .wrap_err("Failed to connect to Bitcoin RPC")?;
245
246    let addr: std::net::SocketAddr = format!("{}:{}", config.host, config.port)
247        .parse()
248        .wrap_err("Failed to parse address")?;
249    let verifier = VerifierServer::<C>::new(config.clone()).await?;
250    verifier.start_background_tasks().await?;
251
252    let svc = ClementineVerifierServer::new(verifier)
253        .max_encoding_message_size(config.grpc.max_message_size)
254        .max_decoding_message_size(config.grpc.max_message_size);
255
256    let (server_addr, shutdown_tx) =
257        create_grpc_server(addr.into(), svc, "Verifier", &config).await?;
258
259    match server_addr {
260        ServerAddr::Tcp(socket_addr) => Ok((socket_addr, shutdown_tx)),
261        _ => Err(BridgeError::ConfigError("Expected TCP address".into())),
262    }
263}
264
265pub async fn create_operator_grpc_server<C: CitreaClientT>(
266    config: BridgeConfig,
267) -> Result<(std::net::SocketAddr, oneshot::Sender<()>), BridgeError> {
268    tracing::info!(
269        "config host and port are: {} and {}",
270        config.host,
271        config.port
272    );
273    let addr: std::net::SocketAddr = format!("{}:{}", config.host, config.port)
274        .parse()
275        .wrap_err("Failed to parse address")?;
276
277    tracing::info!("Creating operator server");
278    let operator = OperatorServer::<C>::new(config.clone()).await?;
279    operator.start_background_tasks().await?;
280
281    tracing::info!("Creating ClementineOperatorServer");
282    let svc = ClementineOperatorServer::new(operator)
283        .max_encoding_message_size(config.grpc.max_message_size)
284        .max_decoding_message_size(config.grpc.max_message_size);
285    let (server_addr, shutdown_tx) =
286        create_grpc_server(addr.into(), svc, "Operator", &config).await?;
287    tracing::info!("Operator gRPC server created");
288
289    match server_addr {
290        ServerAddr::Tcp(socket_addr) => Ok((socket_addr, shutdown_tx)),
291        _ => Err(BridgeError::ConfigError("Expected TCP address".into())),
292    }
293}
294
295pub async fn create_aggregator_grpc_server(
296    config: BridgeConfig,
297) -> Result<(std::net::SocketAddr, oneshot::Sender<()>), BridgeError> {
298    let addr: std::net::SocketAddr = format!("{}:{}", config.host, config.port)
299        .parse()
300        .wrap_err("Failed to parse address")?;
301    let aggregator_server = AggregatorServer::new(config.clone()).await?;
302    aggregator_server.start_background_tasks().await?;
303
304    let svc = ClementineAggregatorServer::new(aggregator_server)
305        .max_encoding_message_size(config.grpc.max_message_size)
306        .max_decoding_message_size(config.grpc.max_message_size);
307
308    if config.client_verification {
309        tracing::warn!("Client verification is enabled on aggregator gRPC server",);
310    }
311
312    let (server_addr, shutdown_tx) =
313        create_grpc_server(addr.into(), svc, "Aggregator", &config).await?;
314
315    match server_addr {
316        ServerAddr::Tcp(socket_addr) => Ok((socket_addr, shutdown_tx)),
317        _ => Err(BridgeError::ConfigError("Expected TCP address".into())),
318    }
319}
320
321// Functions for creating servers with Unix sockets (useful for tests)
322#[cfg(unix)]
323pub async fn create_verifier_unix_server<C: CitreaClientT>(
324    config: BridgeConfig,
325    socket_path: std::path::PathBuf,
326) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
327    let _rpc = ExtendedBitcoinRpc::connect(
328        config.bitcoin_rpc_url.clone(),
329        config.bitcoin_rpc_user.clone(),
330        config.bitcoin_rpc_password.clone(),
331        None,
332    )
333    .await
334    .wrap_err("Failed to connect to Bitcoin RPC")?;
335
336    let verifier = VerifierServer::<C>::new(config.clone()).await?;
337    verifier.start_background_tasks().await?;
338
339    let svc = ClementineVerifierServer::new(verifier)
340        .max_encoding_message_size(config.grpc.max_message_size)
341        .max_decoding_message_size(config.grpc.max_message_size);
342
343    let (server_addr, shutdown_tx) =
344        create_grpc_server(socket_path.into(), svc, "Verifier", &config).await?;
345
346    match server_addr {
347        ServerAddr::Unix(path) => Ok((path, shutdown_tx)),
348        _ => Err(BridgeError::ConfigError("Expected Unix socket path".into())),
349    }
350}
351
352#[cfg(not(unix))]
353pub async fn create_verifier_unix_server(
354    _config: BridgeConfig,
355    _socket_path: std::path::PathBuf,
356) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
357    Err(BridgeError::ConfigError(
358        "Unix sockets are not supported on this platform".into(),
359    ))
360}
361
362#[cfg(unix)]
363pub async fn create_operator_unix_server<C: CitreaClientT>(
364    config: BridgeConfig,
365    socket_path: std::path::PathBuf,
366) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
367    let _rpc = ExtendedBitcoinRpc::connect(
368        config.bitcoin_rpc_url.clone(),
369        config.bitcoin_rpc_user.clone(),
370        config.bitcoin_rpc_password.clone(),
371        None,
372    )
373    .await
374    .wrap_err("Failed to connect to Bitcoin RPC")?;
375
376    let operator = OperatorServer::<C>::new(config.clone()).await?;
377    operator.start_background_tasks().await?;
378
379    let svc = ClementineOperatorServer::new(operator)
380        .max_encoding_message_size(config.grpc.max_message_size)
381        .max_decoding_message_size(config.grpc.max_message_size);
382
383    let (server_addr, shutdown_tx) =
384        create_grpc_server(socket_path.into(), svc, "Operator", &config).await?;
385
386    match server_addr {
387        ServerAddr::Unix(path) => Ok((path, shutdown_tx)),
388        _ => Err(BridgeError::ConfigError("Expected Unix socket path".into())),
389    }
390}
391
392#[cfg(not(unix))]
393pub async fn create_operator_unix_server(
394    _config: BridgeConfig,
395    _socket_path: std::path::PathBuf,
396) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
397    Err(BridgeError::ConfigError(
398        "Unix sockets are not supported on this platform".into(),
399    ))
400}
401
402#[cfg(unix)]
403pub async fn create_aggregator_unix_server(
404    config: BridgeConfig,
405    socket_path: std::path::PathBuf,
406) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
407    let aggregator_server = AggregatorServer::new(config.clone()).await?;
408    aggregator_server.start_background_tasks().await?;
409
410    let svc = ClementineAggregatorServer::new(aggregator_server)
411        .max_encoding_message_size(config.grpc.max_message_size)
412        .max_decoding_message_size(config.grpc.max_message_size);
413
414    let (server_addr, shutdown_tx) =
415        create_grpc_server(socket_path.into(), svc, "Aggregator", &config).await?;
416
417    match server_addr {
418        ServerAddr::Unix(path) => Ok((path, shutdown_tx)),
419        _ => Err(BridgeError::ConfigError("Expected Unix socket path".into())),
420    }
421}
422
423#[cfg(not(unix))]
424pub async fn create_aggregator_unix_server(
425    _config: BridgeConfig,
426    _socket_path: std::path::PathBuf,
427) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
428    Err(BridgeError::ConfigError(
429        "Unix sockets are not supported on this platform".into(),
430    ))
431}