1use crate::aggregator::AggregatorServer;
5use crate::citrea::CitreaClientT;
6use crate::config::BridgeConfig;
7use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
8use crate::operator::OperatorServer;
9use crate::rpc::clementine::clementine_aggregator_server::ClementineAggregatorServer;
10use crate::rpc::clementine::clementine_operator_server::ClementineOperatorServer;
11use crate::rpc::clementine::clementine_verifier_server::ClementineVerifierServer;
12use crate::rpc::interceptors::Interceptors::{Noop, OnlyAggregatorAndSelf};
13use crate::utils::AddMethodMiddlewareLayer;
14use crate::verifier::VerifierServer;
15use clementine_errors::BridgeError;
16use eyre::Context;
17use rustls_pki_types::pem::PemObject;
18use std::time::Duration;
19use tokio::sync::oneshot;
20use tonic::server::NamedService;
21use tonic::service::interceptor::InterceptedService;
22use tonic::transport::{Certificate, CertificateDer, Identity, ServerTlsConfig};
23use tower::buffer::BufferLayer;
24use tower::limit::RateLimitLayer;
25
26#[cfg(test)]
27use crate::test::common::ensure_test_certificates;
28
29#[derive(Debug, Clone)]
31pub enum ServerAddr {
32 Tcp(std::net::SocketAddr),
33 #[cfg(unix)]
34 Unix(std::path::PathBuf),
35}
36
37impl From<std::net::SocketAddr> for ServerAddr {
38 fn from(addr: std::net::SocketAddr) -> Self {
39 ServerAddr::Tcp(addr)
40 }
41}
42
43#[cfg(unix)]
44impl From<std::path::PathBuf> for ServerAddr {
45 fn from(path: std::path::PathBuf) -> Self {
46 ServerAddr::Unix(path)
47 }
48}
49
50pub async fn create_grpc_server<S>(
52 addr: ServerAddr,
53 service: S,
54 server_name: &str,
55 config: &BridgeConfig,
56) -> Result<(ServerAddr, oneshot::Sender<()>), BridgeError>
57where
58 S: tower::Service<
59 http::Request<tonic::body::BoxBody>,
60 Response = http::Response<tonic::body::BoxBody>,
61 Error = std::convert::Infallible,
62 > + Clone
63 + Send
64 + NamedService
65 + 'static,
66 S::Future: Send + 'static,
67{
68 let (ready_tx, ready_rx) = oneshot::channel();
70 let (shutdown_tx, shutdown_rx) = oneshot::channel();
71
72 #[cfg(test)]
74 {
75 ensure_test_certificates().wrap_err("Failed to ensure test certificates")?;
76 }
77
78 match addr {
79 ServerAddr::Tcp(socket_addr) => {
80 let cert = tokio::fs::read(&config.server_cert_path)
81 .await
82 .wrap_err(format!(
83 "Failed to read server certificate from {}",
84 config.server_cert_path.display()
85 ))?;
86 let key = tokio::fs::read(&config.server_key_path)
87 .await
88 .wrap_err(format!(
89 "Failed to read server key from {}",
90 config.server_key_path.display()
91 ))?;
92
93 let server_identity = Identity::from_pem(cert, key);
94
95 let client_ca_cert = tokio::fs::read(&config.ca_cert_path)
97 .await
98 .wrap_err(format!(
99 "Failed to read CA certificate from {}",
100 config.ca_cert_path.display()
101 ))?;
102
103 let client_ca = Certificate::from_pem(client_ca_cert);
104
105 let tls_config = if config.client_verification {
107 ServerTlsConfig::new()
108 .identity(server_identity)
109 .client_ca_root(client_ca)
110 } else {
111 ServerTlsConfig::new().identity(server_identity)
112 };
113
114 let service = InterceptedService::new(
115 service,
116 if config.client_verification {
117 let client_cert = CertificateDer::from_pem_file(&config.client_cert_path)
118 .wrap_err(format!(
119 "Failed to read client certificate from {}",
120 config.client_cert_path.display()
121 ))?
122 .to_owned();
123
124 let aggregator_cert =
125 CertificateDer::from_pem_file(&config.aggregator_cert_path)
126 .wrap_err(format!(
127 "Failed to read aggregator certificate from {}",
128 config.aggregator_cert_path.display()
129 ))?
130 .to_owned();
131
132 OnlyAggregatorAndSelf {
133 aggregator_cert,
134 our_cert: client_cert,
135 }
136 } else {
137 Noop
138 },
139 );
140
141 tracing::info!(
142 "Starting {} gRPC server with TCP address: {}",
143 server_name,
144 socket_addr
145 );
146
147 let server_builder = tonic::transport::Server::builder()
148 .layer(AddMethodMiddlewareLayer)
149 .layer(BufferLayer::new(config.grpc.req_concurrency_limit))
150 .layer(RateLimitLayer::new(
151 config.grpc.ratelimit_req_count as u64,
152 Duration::from_secs(config.grpc.ratelimit_req_interval_secs),
153 ))
154 .timeout(Duration::from_secs(config.grpc.timeout_secs))
155 .tcp_keepalive(Some(Duration::from_secs(config.grpc.tcp_keepalive_secs)))
156 .concurrency_limit_per_connection(config.grpc.req_concurrency_limit)
157 .http2_adaptive_window(Some(true))
158 .tls_config(tls_config)
159 .wrap_err("Failed to configure TLS")?
160 .add_service(service);
161
162 let server_name_str = server_name.to_string();
163
164 let handle = server_builder.serve_with_shutdown(socket_addr, async move {
165 let _ = ready_tx.send(());
166 shutdown_rx.await.ok();
167 tracing::info!("{} gRPC server shutting down", server_name_str);
168 });
169
170 let server_name_str = server_name.to_string();
171
172 tokio::spawn(async move {
173 if let Err(e) = handle.await {
174 tracing::error!("{} gRPC server error: {:?}", server_name_str, e);
175 }
176 });
177 }
178 #[cfg(unix)]
179 ServerAddr::Unix(ref socket_path) => {
180 let server_builder = tonic::transport::Server::builder()
181 .layer(AddMethodMiddlewareLayer)
182 .layer(BufferLayer::new(config.grpc.req_concurrency_limit))
183 .layer(RateLimitLayer::new(
184 config.grpc.ratelimit_req_count as u64,
185 Duration::from_secs(config.grpc.ratelimit_req_interval_secs),
186 ))
187 .timeout(Duration::from_secs(config.grpc.timeout_secs))
188 .concurrency_limit_per_connection(config.grpc.req_concurrency_limit)
189 .add_service(service);
190 tracing::info!(
191 "Starting {} gRPC server with Unix socket: {:?}",
192 server_name,
193 socket_path
194 );
195
196 if socket_path.exists() {
198 std::fs::remove_file(socket_path)
199 .wrap_err("Failed to remove existing gRPC unix socket file")?;
200 }
201
202 let uds = tokio::net::UnixListener::bind(socket_path)
204 .wrap_err("Failed to bind to Unix socket")?;
205 let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
206
207 let server_name_str = server_name.to_string();
208
209 let handle = server_builder.serve_with_incoming_shutdown(incoming, async move {
210 let _ = ready_tx.send(());
211 shutdown_rx.await.ok();
212 tracing::info!("{} gRPC server shutting down", server_name_str);
213 });
214
215 let server_name_str = server_name.to_string();
216
217 tokio::spawn(async move {
218 if let Err(e) = handle.await {
219 tracing::error!("{} gRPC server error: {:?}", server_name_str, e);
220 }
221 });
222 }
223 }
224
225 let _ = ready_rx.await;
227 tracing::info!("{} gRPC server started", server_name);
228
229 Ok((addr, shutdown_tx))
230}
231
232pub async fn create_verifier_grpc_server<C: CitreaClientT>(
233 config: BridgeConfig,
234) -> Result<(std::net::SocketAddr, oneshot::Sender<()>), BridgeError> {
235 let _rpc = ExtendedBitcoinRpc::connect(
236 config.bitcoin_rpc_url.clone(),
237 config.bitcoin_rpc_user.clone(),
238 config.bitcoin_rpc_password.clone(),
239 None,
240 )
241 .await
242 .wrap_err("Failed to connect to Bitcoin RPC")?;
243
244 let addr: std::net::SocketAddr = format!("{}:{}", config.host, config.port)
245 .parse()
246 .wrap_err("Failed to parse address")?;
247 let verifier = VerifierServer::<C>::new(config.clone()).await?;
248 verifier.start_background_tasks().await?;
249
250 let svc = ClementineVerifierServer::new(verifier)
251 .max_encoding_message_size(config.grpc.max_message_size)
252 .max_decoding_message_size(config.grpc.max_message_size);
253
254 let (server_addr, shutdown_tx) =
255 create_grpc_server(addr.into(), svc, "Verifier", &config).await?;
256
257 match server_addr {
258 ServerAddr::Tcp(socket_addr) => Ok((socket_addr, shutdown_tx)),
259 _ => Err(BridgeError::ConfigError("Expected TCP address".into())),
260 }
261}
262
263pub async fn create_operator_grpc_server<C: CitreaClientT>(
264 config: BridgeConfig,
265) -> Result<(std::net::SocketAddr, oneshot::Sender<()>), BridgeError> {
266 tracing::info!(
267 "config host and port are: {} and {}",
268 config.host,
269 config.port
270 );
271 let addr: std::net::SocketAddr = format!("{}:{}", config.host, config.port)
272 .parse()
273 .wrap_err("Failed to parse address")?;
274
275 tracing::info!("Creating operator server");
276 let operator = OperatorServer::<C>::new(config.clone()).await?;
277 operator.start_background_tasks().await?;
278
279 tracing::info!("Creating ClementineOperatorServer");
280 let svc = ClementineOperatorServer::new(operator)
281 .max_encoding_message_size(config.grpc.max_message_size)
282 .max_decoding_message_size(config.grpc.max_message_size);
283 let (server_addr, shutdown_tx) =
284 create_grpc_server(addr.into(), svc, "Operator", &config).await?;
285 tracing::info!("Operator gRPC server created");
286
287 match server_addr {
288 ServerAddr::Tcp(socket_addr) => Ok((socket_addr, shutdown_tx)),
289 _ => Err(BridgeError::ConfigError("Expected TCP address".into())),
290 }
291}
292
293pub async fn create_aggregator_grpc_server(
294 config: BridgeConfig,
295) -> Result<(std::net::SocketAddr, oneshot::Sender<()>), BridgeError> {
296 let addr: std::net::SocketAddr = format!("{}:{}", config.host, config.port)
297 .parse()
298 .wrap_err("Failed to parse address")?;
299 let aggregator_server = AggregatorServer::new(config.clone()).await?;
300 aggregator_server.start_background_tasks().await?;
301
302 let svc = ClementineAggregatorServer::new(aggregator_server)
303 .max_encoding_message_size(config.grpc.max_message_size)
304 .max_decoding_message_size(config.grpc.max_message_size);
305
306 if config.client_verification {
307 tracing::warn!("Client verification is enabled on aggregator gRPC server",);
308 }
309
310 let (server_addr, shutdown_tx) =
311 create_grpc_server(addr.into(), svc, "Aggregator", &config).await?;
312
313 match server_addr {
314 ServerAddr::Tcp(socket_addr) => Ok((socket_addr, shutdown_tx)),
315 _ => Err(BridgeError::ConfigError("Expected TCP address".into())),
316 }
317}
318
319#[cfg(all(unix, test))]
321pub async fn create_verifier_unix_server<C: CitreaClientT>(
322 config: BridgeConfig,
323 socket_path: std::path::PathBuf,
324) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
325 let _rpc = ExtendedBitcoinRpc::connect(
326 config.bitcoin_rpc_url.clone(),
327 config.bitcoin_rpc_user.clone(),
328 config.bitcoin_rpc_password.clone(),
329 None,
330 )
331 .await
332 .wrap_err("Failed to connect to Bitcoin RPC")?;
333
334 let verifier = VerifierServer::<C>::new(config.clone()).await?;
335 verifier.start_background_tasks().await?;
336
337 let svc = ClementineVerifierServer::new(verifier)
338 .max_encoding_message_size(config.grpc.max_message_size)
339 .max_decoding_message_size(config.grpc.max_message_size);
340
341 let (server_addr, shutdown_tx) =
342 create_grpc_server(socket_path.into(), svc, "Verifier", &config).await?;
343
344 match server_addr {
345 ServerAddr::Unix(path) => Ok((path, shutdown_tx)),
346 _ => Err(BridgeError::ConfigError("Expected Unix socket path".into())),
347 }
348}
349
350#[cfg(all(not(unix), test))]
351pub async fn create_verifier_unix_server<C: CitreaClientT>(
352 _config: BridgeConfig,
353 _socket_path: std::path::PathBuf,
354) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
355 Err(BridgeError::ConfigError(
356 "Unix sockets are not supported on this platform".into(),
357 ))
358}
359
360#[cfg(all(unix, test))]
361pub async fn create_operator_unix_server<C: CitreaClientT>(
362 config: BridgeConfig,
363 socket_path: std::path::PathBuf,
364) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
365 let _rpc = ExtendedBitcoinRpc::connect(
366 config.bitcoin_rpc_url.clone(),
367 config.bitcoin_rpc_user.clone(),
368 config.bitcoin_rpc_password.clone(),
369 None,
370 )
371 .await
372 .wrap_err("Failed to connect to Bitcoin RPC")?;
373
374 let operator = OperatorServer::<C>::new(config.clone()).await?;
375 operator.start_background_tasks().await?;
376
377 let svc = ClementineOperatorServer::new(operator)
378 .max_encoding_message_size(config.grpc.max_message_size)
379 .max_decoding_message_size(config.grpc.max_message_size);
380
381 let (server_addr, shutdown_tx) =
382 create_grpc_server(socket_path.into(), svc, "Operator", &config).await?;
383
384 match server_addr {
385 ServerAddr::Unix(path) => Ok((path, shutdown_tx)),
386 _ => Err(BridgeError::ConfigError("Expected Unix socket path".into())),
387 }
388}
389
390#[cfg(all(not(unix), test))]
391pub async fn create_operator_unix_server<C: CitreaClientT>(
392 _config: BridgeConfig,
393 _socket_path: std::path::PathBuf,
394) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
395 Err(BridgeError::ConfigError(
396 "Unix sockets are not supported on this platform".into(),
397 ))
398}
399
400#[cfg(all(unix, test))]
401pub async fn create_aggregator_unix_server(
402 config: BridgeConfig,
403 socket_path: std::path::PathBuf,
404) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
405 let aggregator_server = AggregatorServer::new(config.clone()).await?;
406 aggregator_server.start_background_tasks().await?;
407
408 let svc = ClementineAggregatorServer::new(aggregator_server)
409 .max_encoding_message_size(config.grpc.max_message_size)
410 .max_decoding_message_size(config.grpc.max_message_size);
411
412 let (server_addr, shutdown_tx) =
413 create_grpc_server(socket_path.into(), svc, "Aggregator", &config).await?;
414
415 match server_addr {
416 ServerAddr::Unix(path) => Ok((path, shutdown_tx)),
417 _ => Err(BridgeError::ConfigError("Expected Unix socket path".into())),
418 }
419}
420
421#[cfg(all(not(unix), test))]
422pub async fn create_aggregator_unix_server(
423 _config: BridgeConfig,
424 _socket_path: std::path::PathBuf,
425) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
426 Err(BridgeError::ConfigError(
427 "Unix sockets are not supported on this platform".into(),
428 ))
429}