clementine_core/rpc/
mod.rs

1use crate::{
2    config::BridgeConfig,
3    errors::BridgeError,
4    rpc::clementine::{
5        clementine_operator_client::ClementineOperatorClient,
6        clementine_verifier_client::ClementineVerifierClient,
7    },
8};
9use clementine::*;
10use eyre::Context;
11use hyper_util::rt::TokioIo;
12use std::{path::PathBuf, time::Duration};
13use tagged_signature::SignatureId;
14use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity, Uri};
15
16#[cfg(test)]
17use crate::test::common::ensure_test_certificates;
18
19#[allow(clippy::all)]
20#[rustfmt::skip]
21pub mod clementine;
22
23pub mod aggregator;
24pub mod ecdsa_verification_sig;
25mod error;
26pub mod interceptors;
27pub mod operator;
28pub mod parser;
29pub mod verifier;
30
31pub use parser::ParserError;
32
33impl From<NormalSignatureKind> for SignatureId {
34    fn from(value: NormalSignatureKind) -> Self {
35        SignatureId::NormalSignature(NormalSignatureId {
36            signature_kind: value as i32,
37        })
38    }
39}
40
41impl From<(NumberedSignatureKind, i32)> for SignatureId {
42    fn from(value: (NumberedSignatureKind, i32)) -> Self {
43        SignatureId::NumberedSignature(NumberedSignatureId {
44            signature_kind: value.0 as i32,
45            idx: value.1,
46        })
47    }
48}
49
50/// Returns gRPC clients.
51///
52/// # Parameters
53///
54/// - `endpoints`: URIs for clients (can be http/https URLs or unix:// paths)
55/// - `connect`: Function that will be used to initiate gRPC connection
56/// - `config`: Configuration containing TLS certificate paths
57///
58/// # Returns
59///
60/// - `CLIENT`: [`tonic`] gRPC client.
61pub async fn get_clients<CLIENT, F>(
62    endpoints: Vec<String>,
63    connect: F,
64    config: &crate::config::BridgeConfig,
65    use_client_cert: bool,
66) -> Result<Vec<CLIENT>, BridgeError>
67where
68    F: Fn(Channel) -> CLIENT,
69{
70    // Ensure certificates exist in test mode
71    #[cfg(test)]
72    {
73        ensure_test_certificates().map_err(|e| {
74            BridgeError::ConfigError(format!("Failed to ensure test certificates: {e}"))
75        })?;
76    }
77
78    // Get certificate paths from config or use defaults
79    let client_ca_cert = tokio::fs::read(&config.ca_cert_path)
80        .await
81        .wrap_err(format!(
82            "Failed to read CA certificate from {}",
83            config.ca_cert_path.display()
84        ))?;
85
86    let client_ca = Certificate::from_pem(client_ca_cert);
87
88    // Get certificate paths from config or use defaults
89    let client_cert_path = &config.client_cert_path.clone();
90    let client_key_path = &config.client_key_path.clone();
91
92    // Load client certificate and key
93    let client_cert = tokio::fs::read(&client_cert_path).await.map_err(|e| {
94        BridgeError::ConfigError(format!(
95            "Failed to read client certificate from {}: {}",
96            client_cert_path.display(),
97            e
98        ))
99    })?;
100
101    let client_key = tokio::fs::read(&client_key_path).await.map_err(|e| {
102        BridgeError::ConfigError(format!(
103            "Failed to read client key from {}: {}",
104            client_key_path.display(),
105            e
106        ))
107    })?;
108
109    futures::future::try_join_all(
110        endpoints
111            .into_iter()
112            .map(|endpoint| {
113                let client_cert = client_cert.clone();
114                let client_key = client_key.clone();
115                let client_ca = client_ca.clone();
116
117                let tls_config = if use_client_cert {
118                    let client_identity = Identity::from_pem(client_cert, client_key);
119                    ClientTlsConfig::new()
120                        .identity(client_identity)
121                        .ca_certificate(client_ca)
122                } else {
123                    ClientTlsConfig::new().ca_certificate(client_ca)
124                };
125
126                let connect = &connect;
127
128                async move {
129                    let channel = if endpoint.starts_with("unix://") {
130                        #[cfg(unix)]
131                        {
132                            // Handle Unix socket (only available on Unix platforms)
133                            let path = endpoint.trim_start_matches("unix://").to_string();
134                            Channel::from_static("lttp://[::]:50051")
135                                .connect_with_connector(tower::service_fn(move |_| {
136                                    let path = PathBuf::from(path.clone());
137                                    async move {
138                                        let unix_stream =
139                                            tokio::net::UnixStream::connect(path).await?;
140                                        Ok::<_, std::io::Error>(TokioIo::new(unix_stream))
141                                    }
142                                }))
143                                .await
144                                .wrap_err_with(|| {
145                                    format!("Failed to connect to Unix socket {endpoint}")
146                                })?
147                        }
148
149                        #[cfg(not(unix))]
150                        {
151                            // Windows doesn't support Unix sockets
152                            return Err(BridgeError::ConfigError(format!(
153                                "Unix sockets ({}), are not supported on this platform",
154                                endpoint
155                            )));
156                        }
157                    } else {
158                        // Handle TCP/HTTP connection
159                        let uri = Uri::try_from(endpoint.clone()).map_err(|e| {
160                            BridgeError::ConfigError(format!(
161                                "Endpoint {endpoint} is malformed: {e}"
162                            ))
163                        })?;
164
165                        Channel::builder(uri)
166                            .timeout(Duration::from_secs(config.grpc.timeout_secs))
167                            .concurrency_limit(config.grpc.req_concurrency_limit)
168                            .keep_alive_timeout(Duration::from_secs(config.grpc.tcp_keepalive_secs))
169                            .tls_config(tls_config)
170                            .wrap_err("Failed to configure TLS")?
171                            .connect_lazy()
172                    };
173
174                    Ok(connect(channel))
175                }
176            })
177            .collect::<Vec<_>>(),
178    )
179    .await
180}
181
182pub fn operator_client_builder(
183    config: &BridgeConfig,
184) -> impl Fn(Channel) -> ClementineOperatorClient<Channel> {
185    let max_msg_size = config.grpc.max_message_size;
186    move |channel| {
187        ClementineOperatorClient::new(channel)
188            .max_decoding_message_size(max_msg_size)
189            .max_encoding_message_size(max_msg_size)
190    }
191}
192
193pub fn verifier_client_builder(
194    config: &BridgeConfig,
195) -> impl Fn(Channel) -> ClementineVerifierClient<Channel> {
196    let max_msg_size = config.grpc.max_message_size;
197    move |channel| {
198        ClementineVerifierClient::new(channel)
199            .max_decoding_message_size(max_msg_size)
200            .max_encoding_message_size(max_msg_size)
201    }
202}