clementine_core/
servers.rs

1//! # Servers
2//!
3//! Utilities for operator and verifier servers.
4use crate::aggregator::AggregatorServer;
5use crate::citrea::CitreaClientT;
6use crate::config::BridgeConfig;
7use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
8use crate::operator::OperatorServer;
9use crate::rpc::clementine::clementine_aggregator_server::ClementineAggregatorServer;
10use crate::rpc::clementine::clementine_operator_server::ClementineOperatorServer;
11use crate::rpc::clementine::clementine_verifier_server::ClementineVerifierServer;
12use crate::rpc::interceptors::Interceptors::{Noop, OnlyAggregatorAndSelf};
13use crate::utils::AddMethodMiddlewareLayer;
14use crate::verifier::VerifierServer;
15use clementine_errors::BridgeError;
16use eyre::Context;
17use rustls_pki_types::pem::PemObject;
18use std::time::Duration;
19use tokio::sync::oneshot;
20use tonic::server::NamedService;
21use tonic::service::interceptor::InterceptedService;
22use tonic::transport::{Certificate, CertificateDer, Identity, ServerTlsConfig};
23use tower::buffer::BufferLayer;
24use tower::limit::RateLimitLayer;
25
26#[cfg(test)]
27use crate::test::common::ensure_test_certificates;
28
29/// Represents a network address that can be either TCP or Unix socket
30#[derive(Debug, Clone)]
31pub enum ServerAddr {
32    Tcp(std::net::SocketAddr),
33    #[cfg(unix)]
34    Unix(std::path::PathBuf),
35}
36
37impl From<std::net::SocketAddr> for ServerAddr {
38    fn from(addr: std::net::SocketAddr) -> Self {
39        ServerAddr::Tcp(addr)
40    }
41}
42
43#[cfg(unix)]
44impl From<std::path::PathBuf> for ServerAddr {
45    fn from(path: std::path::PathBuf) -> Self {
46        ServerAddr::Unix(path)
47    }
48}
49
50/// Generic function to create a gRPC server with the given service
51pub async fn create_grpc_server<S>(
52    addr: ServerAddr,
53    service: S,
54    server_name: &str,
55    config: &BridgeConfig,
56) -> Result<(ServerAddr, oneshot::Sender<()>), BridgeError>
57where
58    S: tower::Service<
59            http::Request<tonic::body::BoxBody>,
60            Response = http::Response<tonic::body::BoxBody>,
61            Error = std::convert::Infallible,
62        > + Clone
63        + Send
64        + NamedService
65        + 'static,
66    S::Future: Send + 'static,
67{
68    // Create channels for server readiness and shutdown
69    let (ready_tx, ready_rx) = oneshot::channel();
70    let (shutdown_tx, shutdown_rx) = oneshot::channel();
71
72    // Ensure certificates exist in test mode
73    #[cfg(test)]
74    {
75        ensure_test_certificates().wrap_err("Failed to ensure test certificates")?;
76    }
77
78    match addr {
79        ServerAddr::Tcp(socket_addr) => {
80            let cert = tokio::fs::read(&config.server_cert_path)
81                .await
82                .wrap_err(format!(
83                    "Failed to read server certificate from {}",
84                    config.server_cert_path.display()
85                ))?;
86            let key = tokio::fs::read(&config.server_key_path)
87                .await
88                .wrap_err(format!(
89                    "Failed to read server key from {}",
90                    config.server_key_path.display()
91                ))?;
92
93            let server_identity = Identity::from_pem(cert, key);
94
95            // Load CA certificate for client verification
96            let client_ca_cert = tokio::fs::read(&config.ca_cert_path)
97                .await
98                .wrap_err(format!(
99                    "Failed to read CA certificate from {}",
100                    config.ca_cert_path.display()
101                ))?;
102
103            let client_ca = Certificate::from_pem(client_ca_cert);
104
105            // Build TLS configuration
106            let tls_config = if config.client_verification {
107                ServerTlsConfig::new()
108                    .identity(server_identity)
109                    .client_ca_root(client_ca)
110            } else {
111                ServerTlsConfig::new().identity(server_identity)
112            };
113
114            let service = InterceptedService::new(
115                service,
116                if config.client_verification {
117                    let client_cert = CertificateDer::from_pem_file(&config.client_cert_path)
118                        .wrap_err(format!(
119                            "Failed to read client certificate from {}",
120                            config.client_cert_path.display()
121                        ))?
122                        .to_owned();
123
124                    let aggregator_cert =
125                        CertificateDer::from_pem_file(&config.aggregator_cert_path)
126                            .wrap_err(format!(
127                                "Failed to read aggregator certificate from {}",
128                                config.aggregator_cert_path.display()
129                            ))?
130                            .to_owned();
131
132                    OnlyAggregatorAndSelf {
133                        aggregator_cert,
134                        our_cert: client_cert,
135                    }
136                } else {
137                    Noop
138                },
139            );
140
141            tracing::info!(
142                "Starting {} gRPC server with TCP address: {}",
143                server_name,
144                socket_addr
145            );
146
147            let server_builder = tonic::transport::Server::builder()
148                .layer(AddMethodMiddlewareLayer)
149                .layer(BufferLayer::new(config.grpc.req_concurrency_limit))
150                .layer(RateLimitLayer::new(
151                    config.grpc.ratelimit_req_count as u64,
152                    Duration::from_secs(config.grpc.ratelimit_req_interval_secs),
153                ))
154                .timeout(Duration::from_secs(config.grpc.timeout_secs))
155                .tcp_keepalive(Some(Duration::from_secs(config.grpc.tcp_keepalive_secs)))
156                .concurrency_limit_per_connection(config.grpc.req_concurrency_limit)
157                .http2_adaptive_window(Some(true))
158                .tls_config(tls_config)
159                .wrap_err("Failed to configure TLS")?
160                .add_service(service);
161
162            let server_name_str = server_name.to_string();
163
164            let handle = server_builder.serve_with_shutdown(socket_addr, async move {
165                let _ = ready_tx.send(());
166                shutdown_rx.await.ok();
167                tracing::info!("{} gRPC server shutting down", server_name_str);
168            });
169
170            let server_name_str = server_name.to_string();
171
172            tokio::spawn(async move {
173                if let Err(e) = handle.await {
174                    tracing::error!("{} gRPC server error: {:?}", server_name_str, e);
175                }
176            });
177        }
178        #[cfg(unix)]
179        ServerAddr::Unix(ref socket_path) => {
180            let server_builder = tonic::transport::Server::builder()
181                .layer(AddMethodMiddlewareLayer)
182                .layer(BufferLayer::new(config.grpc.req_concurrency_limit))
183                .layer(RateLimitLayer::new(
184                    config.grpc.ratelimit_req_count as u64,
185                    Duration::from_secs(config.grpc.ratelimit_req_interval_secs),
186                ))
187                .timeout(Duration::from_secs(config.grpc.timeout_secs))
188                .concurrency_limit_per_connection(config.grpc.req_concurrency_limit)
189                .add_service(service);
190            tracing::info!(
191                "Starting {} gRPC server with Unix socket: {:?}",
192                server_name,
193                socket_path
194            );
195
196            // Remove socket file if it already exists
197            if socket_path.exists() {
198                std::fs::remove_file(socket_path)
199                    .wrap_err("Failed to remove existing gRPC unix socket file")?;
200            }
201
202            // Create Unix socket listener
203            let uds = tokio::net::UnixListener::bind(socket_path)
204                .wrap_err("Failed to bind to Unix socket")?;
205            let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
206
207            let server_name_str = server_name.to_string();
208
209            let handle = server_builder.serve_with_incoming_shutdown(incoming, async move {
210                let _ = ready_tx.send(());
211                shutdown_rx.await.ok();
212                tracing::info!("{} gRPC server shutting down", server_name_str);
213            });
214
215            let server_name_str = server_name.to_string();
216
217            tokio::spawn(async move {
218                if let Err(e) = handle.await {
219                    tracing::error!("{} gRPC server error: {:?}", server_name_str, e);
220                }
221            });
222        }
223    }
224
225    // Wait for server to be ready
226    let _ = ready_rx.await;
227    tracing::info!("{} gRPC server started", server_name);
228
229    Ok((addr, shutdown_tx))
230}
231
232pub async fn create_verifier_grpc_server<C: CitreaClientT>(
233    config: BridgeConfig,
234) -> Result<(std::net::SocketAddr, oneshot::Sender<()>), BridgeError> {
235    let _rpc = ExtendedBitcoinRpc::connect(
236        config.bitcoin_rpc_url.clone(),
237        config.bitcoin_rpc_user.clone(),
238        config.bitcoin_rpc_password.clone(),
239        None,
240    )
241    .await
242    .wrap_err("Failed to connect to Bitcoin RPC")?;
243
244    let addr: std::net::SocketAddr = format!("{}:{}", config.host, config.port)
245        .parse()
246        .wrap_err("Failed to parse address")?;
247    let verifier = VerifierServer::<C>::new(config.clone()).await?;
248    verifier.start_background_tasks().await?;
249
250    let svc = ClementineVerifierServer::new(verifier)
251        .max_encoding_message_size(config.grpc.max_message_size)
252        .max_decoding_message_size(config.grpc.max_message_size);
253
254    let (server_addr, shutdown_tx) =
255        create_grpc_server(addr.into(), svc, "Verifier", &config).await?;
256
257    match server_addr {
258        ServerAddr::Tcp(socket_addr) => Ok((socket_addr, shutdown_tx)),
259        _ => Err(BridgeError::ConfigError("Expected TCP address".into())),
260    }
261}
262
263pub async fn create_operator_grpc_server<C: CitreaClientT>(
264    config: BridgeConfig,
265) -> Result<(std::net::SocketAddr, oneshot::Sender<()>), BridgeError> {
266    tracing::info!(
267        "config host and port are: {} and {}",
268        config.host,
269        config.port
270    );
271    let addr: std::net::SocketAddr = format!("{}:{}", config.host, config.port)
272        .parse()
273        .wrap_err("Failed to parse address")?;
274
275    tracing::info!("Creating operator server");
276    let operator = OperatorServer::<C>::new(config.clone()).await?;
277    operator.start_background_tasks().await?;
278
279    tracing::info!("Creating ClementineOperatorServer");
280    let svc = ClementineOperatorServer::new(operator)
281        .max_encoding_message_size(config.grpc.max_message_size)
282        .max_decoding_message_size(config.grpc.max_message_size);
283    let (server_addr, shutdown_tx) =
284        create_grpc_server(addr.into(), svc, "Operator", &config).await?;
285    tracing::info!("Operator gRPC server created");
286
287    match server_addr {
288        ServerAddr::Tcp(socket_addr) => Ok((socket_addr, shutdown_tx)),
289        _ => Err(BridgeError::ConfigError("Expected TCP address".into())),
290    }
291}
292
293pub async fn create_aggregator_grpc_server(
294    config: BridgeConfig,
295) -> Result<(std::net::SocketAddr, oneshot::Sender<()>), BridgeError> {
296    let addr: std::net::SocketAddr = format!("{}:{}", config.host, config.port)
297        .parse()
298        .wrap_err("Failed to parse address")?;
299    let aggregator_server = AggregatorServer::new(config.clone()).await?;
300    aggregator_server.start_background_tasks().await?;
301
302    let svc = ClementineAggregatorServer::new(aggregator_server)
303        .max_encoding_message_size(config.grpc.max_message_size)
304        .max_decoding_message_size(config.grpc.max_message_size);
305
306    if config.client_verification {
307        tracing::warn!("Client verification is enabled on aggregator gRPC server",);
308    }
309
310    let (server_addr, shutdown_tx) =
311        create_grpc_server(addr.into(), svc, "Aggregator", &config).await?;
312
313    match server_addr {
314        ServerAddr::Tcp(socket_addr) => Ok((socket_addr, shutdown_tx)),
315        _ => Err(BridgeError::ConfigError("Expected TCP address".into())),
316    }
317}
318
319// Functions for creating servers with Unix sockets (useful for tests)
320#[cfg(all(unix, test))]
321pub async fn create_verifier_unix_server<C: CitreaClientT>(
322    config: BridgeConfig,
323    socket_path: std::path::PathBuf,
324) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
325    let _rpc = ExtendedBitcoinRpc::connect(
326        config.bitcoin_rpc_url.clone(),
327        config.bitcoin_rpc_user.clone(),
328        config.bitcoin_rpc_password.clone(),
329        None,
330    )
331    .await
332    .wrap_err("Failed to connect to Bitcoin RPC")?;
333
334    let verifier = VerifierServer::<C>::new(config.clone()).await?;
335    verifier.start_background_tasks().await?;
336
337    let svc = ClementineVerifierServer::new(verifier)
338        .max_encoding_message_size(config.grpc.max_message_size)
339        .max_decoding_message_size(config.grpc.max_message_size);
340
341    let (server_addr, shutdown_tx) =
342        create_grpc_server(socket_path.into(), svc, "Verifier", &config).await?;
343
344    match server_addr {
345        ServerAddr::Unix(path) => Ok((path, shutdown_tx)),
346        _ => Err(BridgeError::ConfigError("Expected Unix socket path".into())),
347    }
348}
349
350#[cfg(all(not(unix), test))]
351pub async fn create_verifier_unix_server<C: CitreaClientT>(
352    _config: BridgeConfig,
353    _socket_path: std::path::PathBuf,
354) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
355    Err(BridgeError::ConfigError(
356        "Unix sockets are not supported on this platform".into(),
357    ))
358}
359
360#[cfg(all(unix, test))]
361pub async fn create_operator_unix_server<C: CitreaClientT>(
362    config: BridgeConfig,
363    socket_path: std::path::PathBuf,
364) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
365    let _rpc = ExtendedBitcoinRpc::connect(
366        config.bitcoin_rpc_url.clone(),
367        config.bitcoin_rpc_user.clone(),
368        config.bitcoin_rpc_password.clone(),
369        None,
370    )
371    .await
372    .wrap_err("Failed to connect to Bitcoin RPC")?;
373
374    let operator = OperatorServer::<C>::new(config.clone()).await?;
375    operator.start_background_tasks().await?;
376
377    let svc = ClementineOperatorServer::new(operator)
378        .max_encoding_message_size(config.grpc.max_message_size)
379        .max_decoding_message_size(config.grpc.max_message_size);
380
381    let (server_addr, shutdown_tx) =
382        create_grpc_server(socket_path.into(), svc, "Operator", &config).await?;
383
384    match server_addr {
385        ServerAddr::Unix(path) => Ok((path, shutdown_tx)),
386        _ => Err(BridgeError::ConfigError("Expected Unix socket path".into())),
387    }
388}
389
390#[cfg(all(not(unix), test))]
391pub async fn create_operator_unix_server<C: CitreaClientT>(
392    _config: BridgeConfig,
393    _socket_path: std::path::PathBuf,
394) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
395    Err(BridgeError::ConfigError(
396        "Unix sockets are not supported on this platform".into(),
397    ))
398}
399
400#[cfg(all(unix, test))]
401pub async fn create_aggregator_unix_server(
402    config: BridgeConfig,
403    socket_path: std::path::PathBuf,
404) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
405    let aggregator_server = AggregatorServer::new(config.clone()).await?;
406    aggregator_server.start_background_tasks().await?;
407
408    let svc = ClementineAggregatorServer::new(aggregator_server)
409        .max_encoding_message_size(config.grpc.max_message_size)
410        .max_decoding_message_size(config.grpc.max_message_size);
411
412    let (server_addr, shutdown_tx) =
413        create_grpc_server(socket_path.into(), svc, "Aggregator", &config).await?;
414
415    match server_addr {
416        ServerAddr::Unix(path) => Ok((path, shutdown_tx)),
417        _ => Err(BridgeError::ConfigError("Expected Unix socket path".into())),
418    }
419}
420
421#[cfg(all(not(unix), test))]
422pub async fn create_aggregator_unix_server(
423    _config: BridgeConfig,
424    _socket_path: std::path::PathBuf,
425) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
426    Err(BridgeError::ConfigError(
427        "Unix sockets are not supported on this platform".into(),
428    ))
429}