1use super::clementine::clementine_operator_server::ClementineOperator;
2use super::clementine::{
3 self, ChallengeAckDigest, DepositParams, DepositSignSession, Empty, FinalizedPayoutParams,
4 OperatorKeys, OperatorParams, SchnorrSig, SignedTxWithType, SignedTxsWithType,
5 TransactionRequest, VergenResponse, WithdrawParams, XOnlyPublicKeyRpc,
6};
7use super::error::*;
8use crate::bitvm_client::ClementineBitVMPublicKeys;
9use crate::builder::transaction::sign::{create_and_sign_txs, TransactionRequestData};
10use crate::builder::transaction::ContractContext;
11use crate::citrea::CitreaClientT;
12use crate::compatibility::ActorWithConfig;
13use crate::constants::{DEFAULT_CHANNEL_SIZE, RESTART_BACKGROUND_TASKS_TIMEOUT};
14use crate::deposit::DepositData;
15use crate::operator::OperatorServer;
16use crate::rpc::clementine::{CompatibilityParamsRpc, RawSignedTx, WithdrawParamsWithSig};
17use crate::rpc::ecdsa_verification_sig::{
18 recover_address_from_ecdsa_signature, OperatorWithdrawalMessage,
19};
20use crate::rpc::parser;
21use crate::utils::{get_vergen_response, monitor_standalone_task, timed_request};
22use alloy::primitives::PrimitiveSignature;
23use bitcoin::hashes::Hash;
24use bitcoin::{BlockHash, OutPoint};
25use bitvm::chunk::api::{NUM_HASH, NUM_PUBS, NUM_U256};
26use clementine_errors::BridgeError;
27use clementine_errors::ResultExt;
28use eyre::Context;
29use futures::TryFutureExt;
30use std::convert::TryInto;
31use std::str::FromStr;
32use tokio::sync::mpsc;
33use tokio_stream::wrappers::ReceiverStream;
34use tonic::{async_trait, Request, Response, Status};
35
36#[async_trait]
37impl<C> ClementineOperator for OperatorServer<C>
38where
39 C: CitreaClientT,
40{
41 type DepositSignStream = ReceiverStream<Result<SchnorrSig, Status>>;
42 type GetParamsStream = ReceiverStream<Result<OperatorParams, Status>>;
43
44 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
45 async fn get_compatibility_params(
46 &self,
47 _request: Request<Empty>,
48 ) -> Result<Response<CompatibilityParamsRpc>, Status> {
49 let params = self.operator.get_compatibility_params()?;
50 Ok(Response::new(params.try_into().map_to_status()?))
51 }
52
53 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
54 async fn vergen(&self, _request: Request<Empty>) -> Result<Response<VergenResponse>, Status> {
55 tracing::info!("Vergen rpc called");
56 Ok(Response::new(get_vergen_response()))
57 }
58
59 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
60 async fn restart_background_tasks(
61 &self,
62 _request: tonic::Request<super::Empty>,
63 ) -> std::result::Result<tonic::Response<super::Empty>, tonic::Status> {
64 tracing::info!("Restarting background tasks rpc called");
65 timed_request(
66 RESTART_BACKGROUND_TASKS_TIMEOUT,
67 "Restarting background tasks",
68 self.start_background_tasks(),
69 )
70 .await?;
71 tracing::info!("Restarting background tasks rpc completed");
72 Ok(Response::new(Empty {}))
73 }
74
75 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
76 async fn get_params(
77 &self,
78 _request: Request<Empty>,
79 ) -> Result<Response<Self::GetParamsStream>, Status> {
80 tracing::info!("Get params rpc called");
81 let operator = self.operator.clone();
82 let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
83 let out_stream: Self::GetParamsStream = ReceiverStream::new(rx);
84 let monitor_err_sender = tx.clone();
85
86 let (mut wpk_receiver, mut signature_receiver) = operator.get_params().await?;
87
88 let handle = tokio::spawn(async move {
89 let operator_config: OperatorParams = operator.clone().into();
90 tx.send(Ok(operator_config))
91 .await
92 .map_err(output_stream_ended_prematurely)?;
93
94 while let Some(winternitz_public_key) = wpk_receiver.recv().await {
95 let operator_winternitz_pubkey: OperatorParams = winternitz_public_key.into();
96 tx.send(Ok(operator_winternitz_pubkey))
97 .await
98 .map_err(output_stream_ended_prematurely)?;
99 }
100
101 while let Some(operator_sig) = signature_receiver.recv().await {
102 let unspent_kickoff_sig: OperatorParams = operator_sig.into();
103 tx.send(Ok(unspent_kickoff_sig))
104 .await
105 .map_err(output_stream_ended_prematurely)?;
106 }
107
108 Ok::<(), Status>(())
109 });
110 monitor_standalone_task(handle, "Operator get_params", monitor_err_sender);
111
112 Ok(Response::new(out_stream))
113 }
114
115 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
116 async fn deposit_sign(
117 &self,
118 request: Request<DepositSignSession>,
119 ) -> Result<Response<Self::DepositSignStream>, Status> {
120 tracing::info!("Deposit sign rpc called");
121 let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
122
123 let deposit_sign_session = request.into_inner();
124 let deposit_params: DepositParams = deposit_sign_session.try_into()?;
125 let deposit_data: DepositData = deposit_params.try_into()?;
126 tracing::info!(
127 "Parsed deposit sign rpc params, deposit data: {:?}",
128 deposit_data
129 );
130
131 let expected_sigs = self
132 .operator
133 .config
134 .get_num_required_operator_sigs(&deposit_data);
135
136 let mut deposit_signatures_rx = self.operator.deposit_sign(deposit_data).await?;
137 let monitor_err_sender = tx.clone();
138
139 let handle = tokio::spawn(async move {
140 let mut sent_sigs = 0;
141 while let Some(sig) = deposit_signatures_rx.recv().await {
142 let sig = sig?;
143 let operator_burn_sig = SchnorrSig {
144 schnorr_sig: sig.serialize().to_vec(),
145 };
146
147 tx.send(Ok(operator_burn_sig))
148 .inspect_ok(|_| {
149 sent_sigs += 1;
150 tracing::debug!(
151 "Sent signature {}/{} in deposit_sign()",
152 sent_sigs,
153 expected_sigs
154 );
155 })
156 .await
157 .wrap_err("Failed to send signature in operator rpc deposit sign")
158 .map_to_status()?;
159 }
160 Ok::<(), Status>(())
161 });
162
163 monitor_standalone_task(handle, "Operator deposit sign", monitor_err_sender);
164
165 Ok(Response::new(ReceiverStream::new(rx)))
166 }
167
168 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
169 async fn internal_withdraw(
170 &self,
171 request: Request<WithdrawParams>,
172 ) -> Result<Response<RawSignedTx>, Status> {
173 let (withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
174 parser::operator::parse_withdrawal_sig_params(request.into_inner())?;
175
176 tracing::warn!("Called internal_withdraw with withdrawal id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}", withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount);
177
178 let payout_tx = self
179 .operator
180 .withdraw(
181 withdrawal_id,
182 input_signature,
183 input_outpoint,
184 output_script_pubkey,
185 output_amount,
186 )
187 .await?;
188
189 Ok(Response::new(RawSignedTx::from(&payout_tx)))
190 }
191
192 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
193 async fn withdraw(
194 &self,
195 request: Request<WithdrawParamsWithSig>,
196 ) -> Result<Response<RawSignedTx>, Status> {
197 tracing::info!("Withdraw rpc called");
198 let params = request.into_inner();
199 let withdraw_params = params.withdrawal.ok_or(Status::invalid_argument(
200 "Withdrawal params not found for withdrawal",
201 ))?;
202 let (withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
203 parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
204
205 tracing::warn!(
206 "Parsed withdraw rpc params, withdrawal id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}, verification signature: {:?}", withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount, params.verification_signature
207 );
208
209 if let Some(address_in_config) = self.operator.config.aggregator_verification_address {
211 let verification_signature = params
212 .verification_signature
213 .map(|sig| {
214 PrimitiveSignature::from_str(&sig).map_err(|e| {
215 Status::invalid_argument(format!("Invalid verification signature: {e}"))
216 })
217 })
218 .transpose()?;
219 if let Some(verification_signature) = verification_signature {
221 let address_from_sig =
222 recover_address_from_ecdsa_signature::<OperatorWithdrawalMessage>(
223 withdrawal_id,
224 input_signature,
225 input_outpoint,
226 output_script_pubkey.clone(),
227 output_amount,
228 verification_signature,
229 )?;
230
231 if address_from_sig != address_in_config {
233 return Err(BridgeError::InvalidECDSAVerificationSignature).map_to_status();
234 }
235 } else {
236 return Err(BridgeError::ECDSAVerificationSignatureMissing).map_to_status();
238 }
239 }
240
241 let payout_tx = self
242 .operator
243 .withdraw(
244 withdrawal_id,
245 input_signature,
246 input_outpoint,
247 output_script_pubkey,
248 output_amount,
249 )
250 .await?;
251
252 tracing::info!(
253 "Withdraw rpc completed successfully for withdrawal id: {:?}",
254 withdrawal_id
255 );
256
257 Ok(Response::new(RawSignedTx::from(&payout_tx)))
258 }
259
260 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
261 async fn internal_create_assert_commitment_txs(
262 &self,
263 request: Request<TransactionRequest>,
264 ) -> std::result::Result<tonic::Response<super::SignedTxsWithType>, tonic::Status> {
265 let tx_req = request.into_inner();
266 let tx_req_data: TransactionRequestData = tx_req.try_into()?;
267 tracing::warn!(
268 "Called internal_create_assert_commitment_txs with transaction request data: {:?}",
269 tx_req_data
270 );
271 let raw_txs = self
272 .operator
273 .create_assert_commitment_txs(
274 tx_req_data,
275 ClementineBitVMPublicKeys::get_assert_commit_data(
276 (
277 [[0u8; 32]; NUM_PUBS],
278 [[0u8; 32]; NUM_U256],
279 [[0u8; 16]; NUM_HASH],
280 ),
281 &[0u8; 20],
282 ),
283 None,
284 )
285 .await?;
286
287 Ok(Response::new(SignedTxsWithType {
288 signed_txs: raw_txs
289 .into_iter()
290 .map(|(tx_type, signed_tx)| SignedTxWithType {
291 transaction_type: Some(tx_type.into()),
292 raw_tx: bitcoin::consensus::serialize(&signed_tx),
293 })
294 .collect(),
295 }))
296 }
297
298 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
299 async fn get_deposit_keys(
300 &self,
301 request: Request<DepositParams>,
302 ) -> Result<Response<OperatorKeys>, Status> {
303 let start = std::time::Instant::now();
304 let deposit_params = request.into_inner();
305 let deposit_data: DepositData = deposit_params.try_into()?;
306 tracing::info!(
307 "Called get_deposit_keys with deposit data: {:?}",
308 deposit_data
309 );
310 let winternitz_keys = self
311 .operator
312 .generate_assert_winternitz_pubkeys(deposit_data.get_deposit_outpoint())?;
313 let hashes = self
314 .operator
315 .generate_challenge_ack_preimages_and_hashes(&deposit_data)?;
316 tracing::info!("Generated deposit keys in {:?}", start.elapsed());
317
318 Ok(Response::new(OperatorKeys {
319 winternitz_pubkeys: winternitz_keys
320 .into_iter()
321 .map(|pubkey| pubkey.into())
322 .collect(),
323 challenge_ack_digests: hashes
324 .into_iter()
325 .map(|hash| ChallengeAckDigest { hash: hash.into() })
326 .collect(),
327 }))
328 }
329
330 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
331 async fn internal_create_signed_txs(
332 &self,
333 request: tonic::Request<super::TransactionRequest>,
334 ) -> std::result::Result<tonic::Response<super::SignedTxsWithType>, tonic::Status> {
335 let transaction_request = request.into_inner();
336 let transaction_data: TransactionRequestData = transaction_request.try_into()?;
337 tracing::warn!(
338 "Called internal_create_signed_txs with transaction request data: {:?}",
339 transaction_data
340 );
341 let (_, deposit_data) = self
342 .operator
343 .db
344 .get_deposit_data(None, transaction_data.deposit_outpoint)
345 .await?
346 .ok_or(Status::invalid_argument("Deposit not found in database"))?;
347 let context = ContractContext::new_context_for_kickoff(
348 transaction_data.kickoff_data,
349 deposit_data,
350 self.operator.config.protocol_paramset(),
351 );
352 let raw_txs = create_and_sign_txs(
353 self.operator.db.clone(),
354 &self.operator.signer,
355 self.operator.config.clone(),
356 context,
357 Some([0u8; 20]), None,
359 )
360 .await?;
361
362 Ok(Response::new(SignedTxsWithType {
363 signed_txs: raw_txs
364 .into_iter()
365 .map(|(tx_type, signed_tx)| SignedTxWithType {
366 transaction_type: Some(tx_type.into()),
367 raw_tx: bitcoin::consensus::serialize(&signed_tx),
368 })
369 .collect(),
370 }))
371 }
372
373 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
374 async fn internal_finalized_payout(
375 &self,
376 request: Request<FinalizedPayoutParams>,
377 ) -> Result<Response<clementine::Txid>, Status> {
378 if !cfg!(test) {
379 return Err(Status::permission_denied(
380 "This method is only available in tests",
381 ));
382 }
383
384 tracing::info!(
385 "Internal finalized payout rpc called with finalized payout params: {:?}",
386 request.get_ref()
387 );
388
389 let payout_blockhash: [u8; 32] = request
390 .get_ref()
391 .payout_blockhash
392 .clone()
393 .try_into()
394 .map_err(|e| {
395 Status::invalid_argument(format!(
396 "Failed to convert payout blockhash to [u8; 32]: {e:?}"
397 ))
398 })?;
399 let deposit_outpoint: OutPoint = request
400 .get_ref()
401 .deposit_outpoint
402 .clone()
403 .ok_or(Status::invalid_argument("Failed to get deposit outpoint"))?
404 .try_into()?;
405
406 let mut dbtx = self.operator.db.begin_transaction().await?;
407 let kickoff_txid = self
408 .operator
409 .handle_finalized_payout(
410 &mut dbtx,
411 deposit_outpoint,
412 BlockHash::from_byte_array(payout_blockhash),
413 )
414 .await?;
415 dbtx.commit()
416 .await
417 .wrap_err("Failed to commit transaction")
418 .map_to_status()?;
419
420 Ok(Response::new(kickoff_txid.into()))
421 }
422
423 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
424 async fn internal_end_round(
425 &self,
426 _request: Request<Empty>,
427 ) -> Result<Response<Empty>, Status> {
428 tracing::warn!("Internal end round rpc called");
429 #[cfg(feature = "automation")]
430 {
431 use eyre::Context;
432
433 let mut dbtx = self.operator.db.begin_transaction().await?;
434
435 self.operator.end_round(&mut dbtx).await?;
436
437 dbtx.commit()
438 .await
439 .wrap_err("Failed to commit transaction")
440 .map_to_status()?;
441 Ok(Response::new(Empty {}))
442 }
443
444 #[cfg(not(feature = "automation"))]
445 Err(Status::unimplemented(
446 "Automation is not enabled. Operator does not manage its rounds",
447 ))
448 }
449
450 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
451 async fn get_x_only_public_key(
452 &self,
453 _request: Request<Empty>,
454 ) -> Result<Response<XOnlyPublicKeyRpc>, Status> {
455 tracing::info!("Get xonly public key rpc called");
456 let xonly_pk = self.operator.signer.xonly_public_key.serialize();
457 Ok(Response::new(XOnlyPublicKeyRpc {
458 xonly_public_key: xonly_pk.to_vec(),
459 }))
460 }
461
462 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
463 async fn get_current_status(
464 &self,
465 _request: Request<Empty>,
466 ) -> Result<Response<clementine::EntityStatus>, Status> {
467 tracing::debug!("Get current status rpc called");
468 let status = self.get_current_status().await?;
469 tracing::debug!("Get current status rpc completed successfully");
470 Ok(Response::new(status))
471 }
472
473 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
474 async fn get_reimbursement_txs(
475 &self,
476 request: Request<clementine::Outpoint>,
477 ) -> Result<Response<SignedTxsWithType>, Status> {
478 let deposit_outpoint: OutPoint = request.into_inner().try_into()?;
479 tracing::warn!(
480 "Get reimbursement txs rpc called with deposit outpoint: {:?}",
481 deposit_outpoint
482 );
483 let txs = self
484 .operator
485 .get_reimbursement_txs(deposit_outpoint)
486 .await?;
487 Ok(Response::new(txs.into()))
488 }
489
490 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
491 async fn transfer_to_btc_wallet(
492 &self,
493 request: Request<clementine::Outpoints>,
494 ) -> Result<Response<RawSignedTx>, Status> {
495 let outpoints: Vec<OutPoint> = request
496 .into_inner()
497 .outpoints
498 .into_iter()
499 .map(TryInto::try_into)
500 .collect::<Result<Vec<_>, _>>()
501 .map_err(|e| Status::invalid_argument(format!("Invalid outpoint: {e}")))?;
502
503 if outpoints.is_empty() {
504 return Err(Status::invalid_argument("No outpoints provided"));
505 }
506
507 let mut inputs = Vec::with_capacity(outpoints.len());
508
509 for outpoint in outpoints {
510 tracing::info!(
511 "TransferToBtcWallet rpc called with outpoint: {:?}",
512 outpoint
513 );
514
515 let txout = self
516 .operator
517 .rpc
518 .get_txout_from_outpoint(&outpoint)
519 .await
520 .map_err(|e| Status::internal(format!("Failed to get txout: {e}")))?;
521
522 if txout.script_pubkey != self.operator.signer.address.script_pubkey() {
523 return Err(Status::invalid_argument(format!(
524 "Outpoint script_pubkey does not match operator's address. Expected: {:?}, Got: {:?}",
525 self.operator.signer.address.script_pubkey(),
526 txout.script_pubkey
527 )));
528 }
529
530 inputs.push((outpoint, txout));
531 }
532
533 let signed_tx = self
534 .operator
535 .transfer_outpoints_to_wallet(inputs)
536 .await
537 .map_err(|e| Status::internal(format!("Failed to send outpoints to wallet: {e}")))?;
538
539 tracing::info!("Successfully created transaction sending outpoints to wallet");
540 Ok(Response::new(RawSignedTx::from(&signed_tx)))
541 }
542}