1use super::clementine::clementine_operator_server::ClementineOperator;
2use super::clementine::{
3 self, ChallengeAckDigest, DepositParams, DepositSignSession, Empty, FinalizedPayoutParams,
4 OperatorKeys, OperatorParams, SchnorrSig, SignedTxWithType, SignedTxsWithType,
5 TransactionRequest, VergenResponse, WithdrawParams, XOnlyPublicKeyRpc,
6};
7use super::error::*;
8use crate::bitvm_client::ClementineBitVMPublicKeys;
9use crate::builder::transaction::sign::{create_and_sign_txs, TransactionRequestData};
10use crate::builder::transaction::ContractContext;
11use crate::citrea::CitreaClientT;
12use crate::compatibility::ActorWithConfig;
13use crate::constants::{DEFAULT_CHANNEL_SIZE, RESTART_BACKGROUND_TASKS_TIMEOUT};
14use crate::deposit::DepositData;
15use crate::errors::BridgeError;
16use crate::errors::ResultExt;
17use crate::operator::OperatorServer;
18use crate::rpc::clementine::{CompatibilityParamsRpc, RawSignedTx, WithdrawParamsWithSig};
19use crate::rpc::ecdsa_verification_sig::{
20 recover_address_from_ecdsa_signature, OperatorWithdrawalMessage,
21};
22use crate::rpc::parser;
23use crate::utils::{get_vergen_response, monitor_standalone_task, timed_request};
24use alloy::primitives::PrimitiveSignature;
25use bitcoin::hashes::Hash;
26use bitcoin::{BlockHash, OutPoint};
27use bitvm::chunk::api::{NUM_HASH, NUM_PUBS, NUM_U256};
28use eyre::Context;
29use futures::TryFutureExt;
30use std::str::FromStr;
31use tokio::sync::mpsc;
32use tokio_stream::wrappers::ReceiverStream;
33use tonic::{async_trait, Request, Response, Status};
34
35#[async_trait]
36impl<C> ClementineOperator for OperatorServer<C>
37where
38 C: CitreaClientT,
39{
40 type DepositSignStream = ReceiverStream<Result<SchnorrSig, Status>>;
41 type GetParamsStream = ReceiverStream<Result<OperatorParams, Status>>;
42
43 async fn get_compatibility_params(
44 &self,
45 _request: Request<Empty>,
46 ) -> Result<Response<CompatibilityParamsRpc>, Status> {
47 let params = self.operator.get_compatibility_params()?;
48 Ok(Response::new(params.try_into().map_to_status()?))
49 }
50
51 async fn vergen(&self, _request: Request<Empty>) -> Result<Response<VergenResponse>, Status> {
52 tracing::info!("Vergen rpc called");
53 Ok(Response::new(get_vergen_response()))
54 }
55
56 async fn restart_background_tasks(
57 &self,
58 _request: tonic::Request<super::Empty>,
59 ) -> std::result::Result<tonic::Response<super::Empty>, tonic::Status> {
60 tracing::info!("Restarting background tasks rpc called");
61 timed_request(
62 RESTART_BACKGROUND_TASKS_TIMEOUT,
63 "Restarting background tasks",
64 self.start_background_tasks(),
65 )
66 .await?;
67 tracing::info!("Restarting background tasks rpc completed");
68 Ok(Response::new(Empty {}))
69 }
70
71 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
72 async fn get_params(
73 &self,
74 _request: Request<Empty>,
75 ) -> Result<Response<Self::GetParamsStream>, Status> {
76 tracing::info!("Get params rpc called");
77 let operator = self.operator.clone();
78 let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
79 let out_stream: Self::GetParamsStream = ReceiverStream::new(rx);
80 let monitor_err_sender = tx.clone();
81
82 let (mut wpk_receiver, mut signature_receiver) = operator.get_params().await?;
83
84 let handle = tokio::spawn(async move {
85 let operator_config: OperatorParams = operator.clone().into();
86 tx.send(Ok(operator_config))
87 .await
88 .map_err(output_stream_ended_prematurely)?;
89
90 while let Some(winternitz_public_key) = wpk_receiver.recv().await {
91 let operator_winternitz_pubkey: OperatorParams = winternitz_public_key.into();
92 tx.send(Ok(operator_winternitz_pubkey))
93 .await
94 .map_err(output_stream_ended_prematurely)?;
95 }
96
97 while let Some(operator_sig) = signature_receiver.recv().await {
98 let unspent_kickoff_sig: OperatorParams = operator_sig.into();
99 tx.send(Ok(unspent_kickoff_sig))
100 .await
101 .map_err(output_stream_ended_prematurely)?;
102 }
103
104 Ok::<(), Status>(())
105 });
106 monitor_standalone_task(handle, "Operator get_params", monitor_err_sender);
107
108 Ok(Response::new(out_stream))
109 }
110
111 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
112 async fn deposit_sign(
113 &self,
114 request: Request<DepositSignSession>,
115 ) -> Result<Response<Self::DepositSignStream>, Status> {
116 tracing::info!("Deposit sign rpc called");
117 let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
118
119 let deposit_sign_session = request.into_inner();
120 let deposit_params: DepositParams = deposit_sign_session.try_into()?;
121 let deposit_data: DepositData = deposit_params.try_into()?;
122 tracing::info!(
123 "Parsed deposit sign rpc params, deposit data: {:?}",
124 deposit_data
125 );
126
127 let expected_sigs = self
128 .operator
129 .config
130 .get_num_required_operator_sigs(&deposit_data);
131
132 let mut deposit_signatures_rx = self.operator.deposit_sign(deposit_data).await?;
133 let monitor_err_sender = tx.clone();
134
135 let handle = tokio::spawn(async move {
136 let mut sent_sigs = 0;
137 while let Some(sig) = deposit_signatures_rx.recv().await {
138 let sig = sig?;
139 let operator_burn_sig = SchnorrSig {
140 schnorr_sig: sig.serialize().to_vec(),
141 };
142
143 tx.send(Ok(operator_burn_sig))
144 .inspect_ok(|_| {
145 sent_sigs += 1;
146 tracing::debug!(
147 "Sent signature {}/{} in deposit_sign()",
148 sent_sigs,
149 expected_sigs
150 );
151 })
152 .await
153 .wrap_err("Failed to send signature in operator rpc deposit sign")
154 .map_to_status()?;
155 }
156 Ok::<(), Status>(())
157 });
158
159 monitor_standalone_task(handle, "Operator deposit sign", monitor_err_sender);
160
161 Ok(Response::new(ReceiverStream::new(rx)))
162 }
163
164 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
165 async fn internal_withdraw(
166 &self,
167 request: Request<WithdrawParams>,
168 ) -> Result<Response<RawSignedTx>, Status> {
169 let (withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
170 parser::operator::parse_withdrawal_sig_params(request.into_inner())?;
171
172 tracing::warn!("Called internal_withdraw with withdrawal id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}", withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount);
173
174 let payout_tx = self
175 .operator
176 .withdraw(
177 withdrawal_id,
178 input_signature,
179 input_outpoint,
180 output_script_pubkey,
181 output_amount,
182 )
183 .await?;
184
185 Ok(Response::new(RawSignedTx::from(&payout_tx)))
186 }
187
188 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
189 async fn withdraw(
190 &self,
191 request: Request<WithdrawParamsWithSig>,
192 ) -> Result<Response<RawSignedTx>, Status> {
193 tracing::info!("Withdraw rpc called");
194 let params = request.into_inner();
195 let withdraw_params = params.withdrawal.ok_or(Status::invalid_argument(
196 "Withdrawal params not found for withdrawal",
197 ))?;
198 let (withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
199 parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
200
201 tracing::warn!(
202 "Parsed withdraw rpc params, withdrawal id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}, verification signature: {:?}", withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount, params.verification_signature
203 );
204
205 if let Some(address_in_config) = self.operator.config.aggregator_verification_address {
207 let verification_signature = params
208 .verification_signature
209 .map(|sig| {
210 PrimitiveSignature::from_str(&sig).map_err(|e| {
211 Status::invalid_argument(format!("Invalid verification signature: {e}"))
212 })
213 })
214 .transpose()?;
215 if let Some(verification_signature) = verification_signature {
217 let address_from_sig =
218 recover_address_from_ecdsa_signature::<OperatorWithdrawalMessage>(
219 withdrawal_id,
220 input_signature,
221 input_outpoint,
222 output_script_pubkey.clone(),
223 output_amount,
224 verification_signature,
225 )?;
226
227 if address_from_sig != address_in_config {
229 return Err(BridgeError::InvalidECDSAVerificationSignature).map_to_status();
230 }
231 } else {
232 return Err(BridgeError::ECDSAVerificationSignatureMissing).map_to_status();
234 }
235 }
236
237 let payout_tx = self
238 .operator
239 .withdraw(
240 withdrawal_id,
241 input_signature,
242 input_outpoint,
243 output_script_pubkey,
244 output_amount,
245 )
246 .await?;
247
248 tracing::info!(
249 "Withdraw rpc completed successfully for withdrawal id: {:?}",
250 withdrawal_id
251 );
252
253 Ok(Response::new(RawSignedTx::from(&payout_tx)))
254 }
255
256 #[tracing::instrument(skip(self, request), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
257 async fn internal_create_assert_commitment_txs(
258 &self,
259 request: Request<TransactionRequest>,
260 ) -> std::result::Result<tonic::Response<super::SignedTxsWithType>, tonic::Status> {
261 let tx_req = request.into_inner();
262 let tx_req_data: TransactionRequestData = tx_req.try_into()?;
263 tracing::warn!(
264 "Called internal_create_assert_commitment_txs with transaction request data: {:?}",
265 tx_req_data
266 );
267 let raw_txs = self
268 .operator
269 .create_assert_commitment_txs(
270 tx_req_data,
271 ClementineBitVMPublicKeys::get_assert_commit_data(
272 (
273 [[0u8; 32]; NUM_PUBS],
274 [[0u8; 32]; NUM_U256],
275 [[0u8; 16]; NUM_HASH],
276 ),
277 &[0u8; 20],
278 ),
279 None,
280 )
281 .await?;
282
283 Ok(Response::new(SignedTxsWithType {
284 signed_txs: raw_txs
285 .into_iter()
286 .map(|(tx_type, signed_tx)| SignedTxWithType {
287 transaction_type: Some(tx_type.into()),
288 raw_tx: bitcoin::consensus::serialize(&signed_tx),
289 })
290 .collect(),
291 }))
292 }
293
294 #[tracing::instrument(skip(self, request), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
295 async fn get_deposit_keys(
296 &self,
297 request: Request<DepositParams>,
298 ) -> Result<Response<OperatorKeys>, Status> {
299 let start = std::time::Instant::now();
300 let deposit_params = request.into_inner();
301 let deposit_data: DepositData = deposit_params.try_into()?;
302 tracing::warn!(
303 "Called get_deposit_keys with deposit data: {:?}",
304 deposit_data
305 );
306 let winternitz_keys = self
307 .operator
308 .generate_assert_winternitz_pubkeys(deposit_data.get_deposit_outpoint())?;
309 let hashes = self
310 .operator
311 .generate_challenge_ack_preimages_and_hashes(&deposit_data)?;
312 tracing::info!("Generated deposit keys in {:?}", start.elapsed());
313
314 Ok(Response::new(OperatorKeys {
315 winternitz_pubkeys: winternitz_keys
316 .into_iter()
317 .map(|pubkey| pubkey.into())
318 .collect(),
319 challenge_ack_digests: hashes
320 .into_iter()
321 .map(|hash| ChallengeAckDigest { hash: hash.into() })
322 .collect(),
323 }))
324 }
325
326 async fn internal_create_signed_txs(
327 &self,
328 request: tonic::Request<super::TransactionRequest>,
329 ) -> std::result::Result<tonic::Response<super::SignedTxsWithType>, tonic::Status> {
330 let transaction_request = request.into_inner();
331 let transaction_data: TransactionRequestData = transaction_request.try_into()?;
332 tracing::warn!(
333 "Called internal_create_signed_txs with transaction request data: {:?}",
334 transaction_data
335 );
336 let (_, deposit_data) = self
337 .operator
338 .db
339 .get_deposit_data(None, transaction_data.deposit_outpoint)
340 .await?
341 .ok_or(Status::invalid_argument("Deposit not found in database"))?;
342 let context = ContractContext::new_context_for_kickoff(
343 transaction_data.kickoff_data,
344 deposit_data,
345 self.operator.config.protocol_paramset(),
346 );
347 let raw_txs = create_and_sign_txs(
348 self.operator.db.clone(),
349 &self.operator.signer,
350 self.operator.config.clone(),
351 context,
352 Some([0u8; 20]), None,
354 )
355 .await?;
356
357 Ok(Response::new(SignedTxsWithType {
358 signed_txs: raw_txs
359 .into_iter()
360 .map(|(tx_type, signed_tx)| SignedTxWithType {
361 transaction_type: Some(tx_type.into()),
362 raw_tx: bitcoin::consensus::serialize(&signed_tx),
363 })
364 .collect(),
365 }))
366 }
367
368 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
369 async fn internal_finalized_payout(
370 &self,
371 request: Request<FinalizedPayoutParams>,
372 ) -> Result<Response<clementine::Txid>, Status> {
373 if !cfg!(test) {
374 return Err(Status::permission_denied(
375 "This method is only available in tests",
376 ));
377 }
378
379 tracing::info!(
380 "Internal finalized payout rpc called with finalized payout params: {:?}",
381 request.get_ref()
382 );
383
384 let payout_blockhash: [u8; 32] = request
385 .get_ref()
386 .payout_blockhash
387 .clone()
388 .try_into()
389 .map_err(|e| {
390 Status::invalid_argument(format!(
391 "Failed to convert payout blockhash to [u8; 32]: {e:?}"
392 ))
393 })?;
394 let deposit_outpoint: OutPoint = request
395 .get_ref()
396 .deposit_outpoint
397 .clone()
398 .ok_or(Status::invalid_argument("Failed to get deposit outpoint"))?
399 .try_into()?;
400
401 let mut dbtx = self.operator.db.begin_transaction().await?;
402 let kickoff_txid = self
403 .operator
404 .handle_finalized_payout(
405 &mut dbtx,
406 deposit_outpoint,
407 BlockHash::from_byte_array(payout_blockhash),
408 )
409 .await?;
410 dbtx.commit()
411 .await
412 .wrap_err("Failed to commit transaction")
413 .map_to_status()?;
414
415 Ok(Response::new(kickoff_txid.into()))
416 }
417
418 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
419 async fn internal_end_round(
420 &self,
421 _request: Request<Empty>,
422 ) -> Result<Response<Empty>, Status> {
423 tracing::warn!("Internal end round rpc called");
424 #[cfg(feature = "automation")]
425 {
426 use eyre::Context;
427
428 let mut dbtx = self.operator.db.begin_transaction().await?;
429
430 self.operator.end_round(&mut dbtx).await?;
431
432 dbtx.commit()
433 .await
434 .wrap_err("Failed to commit transaction")
435 .map_to_status()?;
436 Ok(Response::new(Empty {}))
437 }
438
439 #[cfg(not(feature = "automation"))]
440 Err(Status::unimplemented(
441 "Automation is not enabled. Operator does not manage its rounds",
442 ))
443 }
444
445 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
446 async fn get_x_only_public_key(
447 &self,
448 _request: Request<Empty>,
449 ) -> Result<Response<XOnlyPublicKeyRpc>, Status> {
450 tracing::info!("Get xonly public key rpc called");
451 let xonly_pk = self.operator.signer.xonly_public_key.serialize();
452 Ok(Response::new(XOnlyPublicKeyRpc {
453 xonly_public_key: xonly_pk.to_vec(),
454 }))
455 }
456
457 async fn get_current_status(
458 &self,
459 _request: Request<Empty>,
460 ) -> Result<Response<clementine::EntityStatus>, Status> {
461 tracing::info!("Get current status rpc called");
462 let status = self.get_current_status().await?;
463 tracing::info!("Get current status rpc completed successfully");
464 Ok(Response::new(status))
465 }
466
467 async fn get_reimbursement_txs(
468 &self,
469 request: Request<clementine::Outpoint>,
470 ) -> Result<Response<SignedTxsWithType>, Status> {
471 let deposit_outpoint: OutPoint = request.into_inner().try_into()?;
472 tracing::warn!(
473 "Get reimbursement txs rpc called with deposit outpoint: {:?}",
474 deposit_outpoint
475 );
476 let txs = self
477 .operator
478 .get_reimbursement_txs(deposit_outpoint)
479 .await?;
480 Ok(Response::new(txs.into()))
481 }
482}