1use crate::aggregator::AggregatorServer;
5use crate::citrea::CitreaClientT;
6use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
7use crate::operator::OperatorServer;
8use crate::rpc::clementine::clementine_aggregator_server::ClementineAggregatorServer;
9use crate::rpc::clementine::clementine_operator_server::ClementineOperatorServer;
10use crate::rpc::clementine::clementine_verifier_server::ClementineVerifierServer;
11use crate::rpc::interceptors::Interceptors::{Noop, OnlyAggregatorAndSelf};
12use crate::utils::AddMethodMiddlewareLayer;
13use crate::verifier::VerifierServer;
14use crate::{config::BridgeConfig, errors};
15use errors::BridgeError;
16use eyre::Context;
17use rustls_pki_types::pem::PemObject;
18use std::time::Duration;
19use tokio::sync::oneshot;
20use tonic::server::NamedService;
21use tonic::service::interceptor::InterceptedService;
22use tonic::transport::{Certificate, CertificateDer, Identity, ServerTlsConfig};
23use tower::buffer::BufferLayer;
24use tower::limit::RateLimitLayer;
25
26#[cfg(test)]
27use crate::test::common::ensure_test_certificates;
28
29pub type ServerFuture = dyn futures::Future<Output = Result<(), tonic::transport::Error>>;
30
31#[derive(Debug, Clone)]
33pub enum ServerAddr {
34 Tcp(std::net::SocketAddr),
35 #[cfg(unix)]
36 Unix(std::path::PathBuf),
37}
38
39impl From<std::net::SocketAddr> for ServerAddr {
40 fn from(addr: std::net::SocketAddr) -> Self {
41 ServerAddr::Tcp(addr)
42 }
43}
44
45#[cfg(unix)]
46impl From<std::path::PathBuf> for ServerAddr {
47 fn from(path: std::path::PathBuf) -> Self {
48 ServerAddr::Unix(path)
49 }
50}
51
52pub async fn create_grpc_server<S>(
54 addr: ServerAddr,
55 service: S,
56 server_name: &str,
57 config: &BridgeConfig,
58) -> Result<(ServerAddr, oneshot::Sender<()>), BridgeError>
59where
60 S: tower::Service<
61 http::Request<tonic::body::BoxBody>,
62 Response = http::Response<tonic::body::BoxBody>,
63 Error = std::convert::Infallible,
64 > + Clone
65 + Send
66 + NamedService
67 + 'static,
68 S::Future: Send + 'static,
69{
70 let (ready_tx, ready_rx) = oneshot::channel();
72 let (shutdown_tx, shutdown_rx) = oneshot::channel();
73
74 #[cfg(test)]
76 {
77 ensure_test_certificates().wrap_err("Failed to ensure test certificates")?;
78 }
79
80 match addr {
81 ServerAddr::Tcp(socket_addr) => {
82 let cert = tokio::fs::read(&config.server_cert_path)
83 .await
84 .wrap_err(format!(
85 "Failed to read server certificate from {}",
86 config.server_cert_path.display()
87 ))?;
88 let key = tokio::fs::read(&config.server_key_path)
89 .await
90 .wrap_err(format!(
91 "Failed to read server key from {}",
92 config.server_key_path.display()
93 ))?;
94
95 let server_identity = Identity::from_pem(cert, key);
96
97 let client_ca_cert = tokio::fs::read(&config.ca_cert_path)
99 .await
100 .wrap_err(format!(
101 "Failed to read CA certificate from {}",
102 config.ca_cert_path.display()
103 ))?;
104
105 let client_ca = Certificate::from_pem(client_ca_cert);
106
107 let tls_config = if config.client_verification {
109 ServerTlsConfig::new()
110 .identity(server_identity)
111 .client_ca_root(client_ca)
112 } else {
113 ServerTlsConfig::new().identity(server_identity)
114 };
115
116 let service = InterceptedService::new(
117 service,
118 if config.client_verification {
119 let client_cert = CertificateDer::from_pem_file(&config.client_cert_path)
120 .wrap_err(format!(
121 "Failed to read client certificate from {}",
122 config.client_cert_path.display()
123 ))?
124 .to_owned();
125
126 let aggregator_cert =
127 CertificateDer::from_pem_file(&config.aggregator_cert_path)
128 .wrap_err(format!(
129 "Failed to read aggregator certificate from {}",
130 config.aggregator_cert_path.display()
131 ))?
132 .to_owned();
133
134 OnlyAggregatorAndSelf {
135 aggregator_cert,
136 our_cert: client_cert,
137 }
138 } else {
139 Noop
140 },
141 );
142
143 tracing::info!(
144 "Starting {} gRPC server with TCP address: {}",
145 server_name,
146 socket_addr
147 );
148
149 let server_builder = tonic::transport::Server::builder()
150 .layer(AddMethodMiddlewareLayer)
151 .layer(BufferLayer::new(config.grpc.req_concurrency_limit))
152 .layer(RateLimitLayer::new(
153 config.grpc.ratelimit_req_count as u64,
154 Duration::from_secs(config.grpc.ratelimit_req_interval_secs),
155 ))
156 .timeout(Duration::from_secs(config.grpc.timeout_secs))
157 .tcp_keepalive(Some(Duration::from_secs(config.grpc.tcp_keepalive_secs)))
158 .concurrency_limit_per_connection(config.grpc.req_concurrency_limit)
159 .http2_adaptive_window(Some(true))
160 .tls_config(tls_config)
161 .wrap_err("Failed to configure TLS")?
162 .add_service(service);
163
164 let server_name_str = server_name.to_string();
165
166 let handle = server_builder.serve_with_shutdown(socket_addr, async move {
167 let _ = ready_tx.send(());
168 shutdown_rx.await.ok();
169 tracing::info!("{} gRPC server shutting down", server_name_str);
170 });
171
172 let server_name_str = server_name.to_string();
173
174 tokio::spawn(async move {
175 if let Err(e) = handle.await {
176 tracing::error!("{} gRPC server error: {:?}", server_name_str, e);
177 }
178 });
179 }
180 #[cfg(unix)]
181 ServerAddr::Unix(ref socket_path) => {
182 let server_builder = tonic::transport::Server::builder()
183 .layer(AddMethodMiddlewareLayer)
184 .layer(BufferLayer::new(config.grpc.req_concurrency_limit))
185 .layer(RateLimitLayer::new(
186 config.grpc.ratelimit_req_count as u64,
187 Duration::from_secs(config.grpc.ratelimit_req_interval_secs),
188 ))
189 .timeout(Duration::from_secs(config.grpc.timeout_secs))
190 .concurrency_limit_per_connection(config.grpc.req_concurrency_limit)
191 .add_service(service);
192 tracing::info!(
193 "Starting {} gRPC server with Unix socket: {:?}",
194 server_name,
195 socket_path
196 );
197
198 if socket_path.exists() {
200 std::fs::remove_file(socket_path)
201 .wrap_err("Failed to remove existing gRPC unix socket file")?;
202 }
203
204 let uds = tokio::net::UnixListener::bind(socket_path)
206 .wrap_err("Failed to bind to Unix socket")?;
207 let incoming = tokio_stream::wrappers::UnixListenerStream::new(uds);
208
209 let server_name_str = server_name.to_string();
210
211 let handle = server_builder.serve_with_incoming_shutdown(incoming, async move {
212 let _ = ready_tx.send(());
213 shutdown_rx.await.ok();
214 tracing::info!("{} gRPC server shutting down", server_name_str);
215 });
216
217 let server_name_str = server_name.to_string();
218
219 tokio::spawn(async move {
220 if let Err(e) = handle.await {
221 tracing::error!("{} gRPC server error: {:?}", server_name_str, e);
222 }
223 });
224 }
225 }
226
227 let _ = ready_rx.await;
229 tracing::info!("{} gRPC server started", server_name);
230
231 Ok((addr, shutdown_tx))
232}
233
234pub async fn create_verifier_grpc_server<C: CitreaClientT>(
235 config: BridgeConfig,
236) -> Result<(std::net::SocketAddr, oneshot::Sender<()>), BridgeError> {
237 let _rpc = ExtendedBitcoinRpc::connect(
238 config.bitcoin_rpc_url.clone(),
239 config.bitcoin_rpc_user.clone(),
240 config.bitcoin_rpc_password.clone(),
241 None,
242 )
243 .await
244 .wrap_err("Failed to connect to Bitcoin RPC")?;
245
246 let addr: std::net::SocketAddr = format!("{}:{}", config.host, config.port)
247 .parse()
248 .wrap_err("Failed to parse address")?;
249 let verifier = VerifierServer::<C>::new(config.clone()).await?;
250 verifier.start_background_tasks().await?;
251
252 let svc = ClementineVerifierServer::new(verifier)
253 .max_encoding_message_size(config.grpc.max_message_size)
254 .max_decoding_message_size(config.grpc.max_message_size);
255
256 let (server_addr, shutdown_tx) =
257 create_grpc_server(addr.into(), svc, "Verifier", &config).await?;
258
259 match server_addr {
260 ServerAddr::Tcp(socket_addr) => Ok((socket_addr, shutdown_tx)),
261 _ => Err(BridgeError::ConfigError("Expected TCP address".into())),
262 }
263}
264
265pub async fn create_operator_grpc_server<C: CitreaClientT>(
266 config: BridgeConfig,
267) -> Result<(std::net::SocketAddr, oneshot::Sender<()>), BridgeError> {
268 tracing::info!(
269 "config host and port are: {} and {}",
270 config.host,
271 config.port
272 );
273 let addr: std::net::SocketAddr = format!("{}:{}", config.host, config.port)
274 .parse()
275 .wrap_err("Failed to parse address")?;
276
277 tracing::info!("Creating operator server");
278 let operator = OperatorServer::<C>::new(config.clone()).await?;
279 operator.start_background_tasks().await?;
280
281 tracing::info!("Creating ClementineOperatorServer");
282 let svc = ClementineOperatorServer::new(operator)
283 .max_encoding_message_size(config.grpc.max_message_size)
284 .max_decoding_message_size(config.grpc.max_message_size);
285 let (server_addr, shutdown_tx) =
286 create_grpc_server(addr.into(), svc, "Operator", &config).await?;
287 tracing::info!("Operator gRPC server created");
288
289 match server_addr {
290 ServerAddr::Tcp(socket_addr) => Ok((socket_addr, shutdown_tx)),
291 _ => Err(BridgeError::ConfigError("Expected TCP address".into())),
292 }
293}
294
295pub async fn create_aggregator_grpc_server(
296 config: BridgeConfig,
297) -> Result<(std::net::SocketAddr, oneshot::Sender<()>), BridgeError> {
298 let addr: std::net::SocketAddr = format!("{}:{}", config.host, config.port)
299 .parse()
300 .wrap_err("Failed to parse address")?;
301 let aggregator_server = AggregatorServer::new(config.clone()).await?;
302 aggregator_server.start_background_tasks().await?;
303
304 let svc = ClementineAggregatorServer::new(aggregator_server)
305 .max_encoding_message_size(config.grpc.max_message_size)
306 .max_decoding_message_size(config.grpc.max_message_size);
307
308 if config.client_verification {
309 tracing::warn!("Client verification is enabled on aggregator gRPC server",);
310 }
311
312 let (server_addr, shutdown_tx) =
313 create_grpc_server(addr.into(), svc, "Aggregator", &config).await?;
314
315 match server_addr {
316 ServerAddr::Tcp(socket_addr) => Ok((socket_addr, shutdown_tx)),
317 _ => Err(BridgeError::ConfigError("Expected TCP address".into())),
318 }
319}
320
321#[cfg(unix)]
323pub async fn create_verifier_unix_server<C: CitreaClientT>(
324 config: BridgeConfig,
325 socket_path: std::path::PathBuf,
326) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
327 let _rpc = ExtendedBitcoinRpc::connect(
328 config.bitcoin_rpc_url.clone(),
329 config.bitcoin_rpc_user.clone(),
330 config.bitcoin_rpc_password.clone(),
331 None,
332 )
333 .await
334 .wrap_err("Failed to connect to Bitcoin RPC")?;
335
336 let verifier = VerifierServer::<C>::new(config.clone()).await?;
337 verifier.start_background_tasks().await?;
338
339 let svc = ClementineVerifierServer::new(verifier)
340 .max_encoding_message_size(config.grpc.max_message_size)
341 .max_decoding_message_size(config.grpc.max_message_size);
342
343 let (server_addr, shutdown_tx) =
344 create_grpc_server(socket_path.into(), svc, "Verifier", &config).await?;
345
346 match server_addr {
347 ServerAddr::Unix(path) => Ok((path, shutdown_tx)),
348 _ => Err(BridgeError::ConfigError("Expected Unix socket path".into())),
349 }
350}
351
352#[cfg(not(unix))]
353pub async fn create_verifier_unix_server(
354 _config: BridgeConfig,
355 _socket_path: std::path::PathBuf,
356) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
357 Err(BridgeError::ConfigError(
358 "Unix sockets are not supported on this platform".into(),
359 ))
360}
361
362#[cfg(unix)]
363pub async fn create_operator_unix_server<C: CitreaClientT>(
364 config: BridgeConfig,
365 socket_path: std::path::PathBuf,
366) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
367 let _rpc = ExtendedBitcoinRpc::connect(
368 config.bitcoin_rpc_url.clone(),
369 config.bitcoin_rpc_user.clone(),
370 config.bitcoin_rpc_password.clone(),
371 None,
372 )
373 .await
374 .wrap_err("Failed to connect to Bitcoin RPC")?;
375
376 let operator = OperatorServer::<C>::new(config.clone()).await?;
377 operator.start_background_tasks().await?;
378
379 let svc = ClementineOperatorServer::new(operator)
380 .max_encoding_message_size(config.grpc.max_message_size)
381 .max_decoding_message_size(config.grpc.max_message_size);
382
383 let (server_addr, shutdown_tx) =
384 create_grpc_server(socket_path.into(), svc, "Operator", &config).await?;
385
386 match server_addr {
387 ServerAddr::Unix(path) => Ok((path, shutdown_tx)),
388 _ => Err(BridgeError::ConfigError("Expected Unix socket path".into())),
389 }
390}
391
392#[cfg(not(unix))]
393pub async fn create_operator_unix_server(
394 _config: BridgeConfig,
395 _socket_path: std::path::PathBuf,
396) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
397 Err(BridgeError::ConfigError(
398 "Unix sockets are not supported on this platform".into(),
399 ))
400}
401
402#[cfg(unix)]
403pub async fn create_aggregator_unix_server(
404 config: BridgeConfig,
405 socket_path: std::path::PathBuf,
406) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
407 let aggregator_server = AggregatorServer::new(config.clone()).await?;
408 aggregator_server.start_background_tasks().await?;
409
410 let svc = ClementineAggregatorServer::new(aggregator_server)
411 .max_encoding_message_size(config.grpc.max_message_size)
412 .max_decoding_message_size(config.grpc.max_message_size);
413
414 let (server_addr, shutdown_tx) =
415 create_grpc_server(socket_path.into(), svc, "Aggregator", &config).await?;
416
417 match server_addr {
418 ServerAddr::Unix(path) => Ok((path, shutdown_tx)),
419 _ => Err(BridgeError::ConfigError("Expected Unix socket path".into())),
420 }
421}
422
423#[cfg(not(unix))]
424pub async fn create_aggregator_unix_server(
425 _config: BridgeConfig,
426 _socket_path: std::path::PathBuf,
427) -> Result<(std::path::PathBuf, oneshot::Sender<()>), BridgeError> {
428 Err(BridgeError::ConfigError(
429 "Unix sockets are not supported on this platform".into(),
430 ))
431}