1use super::clementine::clementine_operator_server::ClementineOperator;
2use super::clementine::{
3 self, ChallengeAckDigest, DepositParams, DepositSignSession, Empty, FinalizedPayoutParams,
4 OperatorKeys, OperatorParams, SchnorrSig, SignedTxWithType, SignedTxsWithType,
5 TransactionRequest, VergenResponse, WithdrawParams, XOnlyPublicKeyRpc,
6};
7use super::error::*;
8use crate::bitvm_client::ClementineBitVMPublicKeys;
9use crate::builder::transaction::sign::{create_and_sign_txs, TransactionRequestData};
10use crate::builder::transaction::ContractContext;
11use crate::citrea::CitreaClientT;
12use crate::compatibility::ActorWithConfig;
13use crate::constants::{DEFAULT_CHANNEL_SIZE, RESTART_BACKGROUND_TASKS_TIMEOUT};
14use crate::deposit::DepositData;
15use crate::operator::OperatorServer;
16use crate::rpc::clementine::{CompatibilityParamsRpc, RawSignedTx, WithdrawParamsWithSig};
17use crate::rpc::ecdsa_verification_sig::{
18 recover_address_from_ecdsa_signature, OperatorWithdrawalMessage,
19};
20use crate::rpc::parser;
21use crate::utils::{get_vergen_response, monitor_standalone_task, timed_request};
22use alloy::primitives::PrimitiveSignature;
23use bitcoin::hashes::Hash;
24use bitcoin::{BlockHash, OutPoint};
25use bitvm::chunk::api::{NUM_HASH, NUM_PUBS, NUM_U256};
26use clementine_errors::BridgeError;
27use clementine_errors::ResultExt;
28use eyre::Context;
29use futures::TryFutureExt;
30use std::convert::TryInto;
31use std::str::FromStr;
32use tokio::sync::mpsc;
33use tokio_stream::wrappers::ReceiverStream;
34use tonic::{async_trait, Request, Response, Status};
35
36#[async_trait]
37impl<C> ClementineOperator for OperatorServer<C>
38where
39 C: CitreaClientT,
40{
41 type DepositSignStream = ReceiverStream<Result<SchnorrSig, Status>>;
42 type GetParamsStream = ReceiverStream<Result<OperatorParams, Status>>;
43
44 async fn get_compatibility_params(
45 &self,
46 _request: Request<Empty>,
47 ) -> Result<Response<CompatibilityParamsRpc>, Status> {
48 let params = self.operator.get_compatibility_params()?;
49 Ok(Response::new(params.try_into().map_to_status()?))
50 }
51
52 async fn vergen(&self, _request: Request<Empty>) -> Result<Response<VergenResponse>, Status> {
53 tracing::info!("Vergen rpc called");
54 Ok(Response::new(get_vergen_response()))
55 }
56
57 async fn restart_background_tasks(
58 &self,
59 _request: tonic::Request<super::Empty>,
60 ) -> std::result::Result<tonic::Response<super::Empty>, tonic::Status> {
61 tracing::info!("Restarting background tasks rpc called");
62 timed_request(
63 RESTART_BACKGROUND_TASKS_TIMEOUT,
64 "Restarting background tasks",
65 self.start_background_tasks(),
66 )
67 .await?;
68 tracing::info!("Restarting background tasks rpc completed");
69 Ok(Response::new(Empty {}))
70 }
71
72 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
73 async fn get_params(
74 &self,
75 _request: Request<Empty>,
76 ) -> Result<Response<Self::GetParamsStream>, Status> {
77 tracing::info!("Get params rpc called");
78 let operator = self.operator.clone();
79 let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
80 let out_stream: Self::GetParamsStream = ReceiverStream::new(rx);
81 let monitor_err_sender = tx.clone();
82
83 let (mut wpk_receiver, mut signature_receiver) = operator.get_params().await?;
84
85 let handle = tokio::spawn(async move {
86 let operator_config: OperatorParams = operator.clone().into();
87 tx.send(Ok(operator_config))
88 .await
89 .map_err(output_stream_ended_prematurely)?;
90
91 while let Some(winternitz_public_key) = wpk_receiver.recv().await {
92 let operator_winternitz_pubkey: OperatorParams = winternitz_public_key.into();
93 tx.send(Ok(operator_winternitz_pubkey))
94 .await
95 .map_err(output_stream_ended_prematurely)?;
96 }
97
98 while let Some(operator_sig) = signature_receiver.recv().await {
99 let unspent_kickoff_sig: OperatorParams = operator_sig.into();
100 tx.send(Ok(unspent_kickoff_sig))
101 .await
102 .map_err(output_stream_ended_prematurely)?;
103 }
104
105 Ok::<(), Status>(())
106 });
107 monitor_standalone_task(handle, "Operator get_params", monitor_err_sender);
108
109 Ok(Response::new(out_stream))
110 }
111
112 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
113 async fn deposit_sign(
114 &self,
115 request: Request<DepositSignSession>,
116 ) -> Result<Response<Self::DepositSignStream>, Status> {
117 tracing::info!("Deposit sign rpc called");
118 let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
119
120 let deposit_sign_session = request.into_inner();
121 let deposit_params: DepositParams = deposit_sign_session.try_into()?;
122 let deposit_data: DepositData = deposit_params.try_into()?;
123 tracing::info!(
124 "Parsed deposit sign rpc params, deposit data: {:?}",
125 deposit_data
126 );
127
128 let expected_sigs = self
129 .operator
130 .config
131 .get_num_required_operator_sigs(&deposit_data);
132
133 let mut deposit_signatures_rx = self.operator.deposit_sign(deposit_data).await?;
134 let monitor_err_sender = tx.clone();
135
136 let handle = tokio::spawn(async move {
137 let mut sent_sigs = 0;
138 while let Some(sig) = deposit_signatures_rx.recv().await {
139 let sig = sig?;
140 let operator_burn_sig = SchnorrSig {
141 schnorr_sig: sig.serialize().to_vec(),
142 };
143
144 tx.send(Ok(operator_burn_sig))
145 .inspect_ok(|_| {
146 sent_sigs += 1;
147 tracing::debug!(
148 "Sent signature {}/{} in deposit_sign()",
149 sent_sigs,
150 expected_sigs
151 );
152 })
153 .await
154 .wrap_err("Failed to send signature in operator rpc deposit sign")
155 .map_to_status()?;
156 }
157 Ok::<(), Status>(())
158 });
159
160 monitor_standalone_task(handle, "Operator deposit sign", monitor_err_sender);
161
162 Ok(Response::new(ReceiverStream::new(rx)))
163 }
164
165 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
166 async fn internal_withdraw(
167 &self,
168 request: Request<WithdrawParams>,
169 ) -> Result<Response<RawSignedTx>, Status> {
170 let (withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
171 parser::operator::parse_withdrawal_sig_params(request.into_inner())?;
172
173 tracing::warn!("Called internal_withdraw with withdrawal id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}", withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount);
174
175 let payout_tx = self
176 .operator
177 .withdraw(
178 withdrawal_id,
179 input_signature,
180 input_outpoint,
181 output_script_pubkey,
182 output_amount,
183 )
184 .await?;
185
186 Ok(Response::new(RawSignedTx::from(&payout_tx)))
187 }
188
189 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
190 async fn withdraw(
191 &self,
192 request: Request<WithdrawParamsWithSig>,
193 ) -> Result<Response<RawSignedTx>, Status> {
194 tracing::info!("Withdraw rpc called");
195 let params = request.into_inner();
196 let withdraw_params = params.withdrawal.ok_or(Status::invalid_argument(
197 "Withdrawal params not found for withdrawal",
198 ))?;
199 let (withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
200 parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
201
202 tracing::warn!(
203 "Parsed withdraw rpc params, withdrawal id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}, verification signature: {:?}", withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount, params.verification_signature
204 );
205
206 if let Some(address_in_config) = self.operator.config.aggregator_verification_address {
208 let verification_signature = params
209 .verification_signature
210 .map(|sig| {
211 PrimitiveSignature::from_str(&sig).map_err(|e| {
212 Status::invalid_argument(format!("Invalid verification signature: {e}"))
213 })
214 })
215 .transpose()?;
216 if let Some(verification_signature) = verification_signature {
218 let address_from_sig =
219 recover_address_from_ecdsa_signature::<OperatorWithdrawalMessage>(
220 withdrawal_id,
221 input_signature,
222 input_outpoint,
223 output_script_pubkey.clone(),
224 output_amount,
225 verification_signature,
226 )?;
227
228 if address_from_sig != address_in_config {
230 return Err(BridgeError::InvalidECDSAVerificationSignature).map_to_status();
231 }
232 } else {
233 return Err(BridgeError::ECDSAVerificationSignatureMissing).map_to_status();
235 }
236 }
237
238 let payout_tx = self
239 .operator
240 .withdraw(
241 withdrawal_id,
242 input_signature,
243 input_outpoint,
244 output_script_pubkey,
245 output_amount,
246 )
247 .await?;
248
249 tracing::info!(
250 "Withdraw rpc completed successfully for withdrawal id: {:?}",
251 withdrawal_id
252 );
253
254 Ok(Response::new(RawSignedTx::from(&payout_tx)))
255 }
256
257 #[tracing::instrument(skip(self, request), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
258 async fn internal_create_assert_commitment_txs(
259 &self,
260 request: Request<TransactionRequest>,
261 ) -> std::result::Result<tonic::Response<super::SignedTxsWithType>, tonic::Status> {
262 let tx_req = request.into_inner();
263 let tx_req_data: TransactionRequestData = tx_req.try_into()?;
264 tracing::warn!(
265 "Called internal_create_assert_commitment_txs with transaction request data: {:?}",
266 tx_req_data
267 );
268 let raw_txs = self
269 .operator
270 .create_assert_commitment_txs(
271 tx_req_data,
272 ClementineBitVMPublicKeys::get_assert_commit_data(
273 (
274 [[0u8; 32]; NUM_PUBS],
275 [[0u8; 32]; NUM_U256],
276 [[0u8; 16]; NUM_HASH],
277 ),
278 &[0u8; 20],
279 ),
280 None,
281 )
282 .await?;
283
284 Ok(Response::new(SignedTxsWithType {
285 signed_txs: raw_txs
286 .into_iter()
287 .map(|(tx_type, signed_tx)| SignedTxWithType {
288 transaction_type: Some(tx_type.into()),
289 raw_tx: bitcoin::consensus::serialize(&signed_tx),
290 })
291 .collect(),
292 }))
293 }
294
295 #[tracing::instrument(skip(self, request), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
296 async fn get_deposit_keys(
297 &self,
298 request: Request<DepositParams>,
299 ) -> Result<Response<OperatorKeys>, Status> {
300 let start = std::time::Instant::now();
301 let deposit_params = request.into_inner();
302 let deposit_data: DepositData = deposit_params.try_into()?;
303 tracing::warn!(
304 "Called get_deposit_keys with deposit data: {:?}",
305 deposit_data
306 );
307 let winternitz_keys = self
308 .operator
309 .generate_assert_winternitz_pubkeys(deposit_data.get_deposit_outpoint())?;
310 let hashes = self
311 .operator
312 .generate_challenge_ack_preimages_and_hashes(&deposit_data)?;
313 tracing::info!("Generated deposit keys in {:?}", start.elapsed());
314
315 Ok(Response::new(OperatorKeys {
316 winternitz_pubkeys: winternitz_keys
317 .into_iter()
318 .map(|pubkey| pubkey.into())
319 .collect(),
320 challenge_ack_digests: hashes
321 .into_iter()
322 .map(|hash| ChallengeAckDigest { hash: hash.into() })
323 .collect(),
324 }))
325 }
326
327 async fn internal_create_signed_txs(
328 &self,
329 request: tonic::Request<super::TransactionRequest>,
330 ) -> std::result::Result<tonic::Response<super::SignedTxsWithType>, tonic::Status> {
331 let transaction_request = request.into_inner();
332 let transaction_data: TransactionRequestData = transaction_request.try_into()?;
333 tracing::warn!(
334 "Called internal_create_signed_txs with transaction request data: {:?}",
335 transaction_data
336 );
337 let (_, deposit_data) = self
338 .operator
339 .db
340 .get_deposit_data(None, transaction_data.deposit_outpoint)
341 .await?
342 .ok_or(Status::invalid_argument("Deposit not found in database"))?;
343 let context = ContractContext::new_context_for_kickoff(
344 transaction_data.kickoff_data,
345 deposit_data,
346 self.operator.config.protocol_paramset(),
347 );
348 let raw_txs = create_and_sign_txs(
349 self.operator.db.clone(),
350 &self.operator.signer,
351 self.operator.config.clone(),
352 context,
353 Some([0u8; 20]), None,
355 )
356 .await?;
357
358 Ok(Response::new(SignedTxsWithType {
359 signed_txs: raw_txs
360 .into_iter()
361 .map(|(tx_type, signed_tx)| SignedTxWithType {
362 transaction_type: Some(tx_type.into()),
363 raw_tx: bitcoin::consensus::serialize(&signed_tx),
364 })
365 .collect(),
366 }))
367 }
368
369 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
370 async fn internal_finalized_payout(
371 &self,
372 request: Request<FinalizedPayoutParams>,
373 ) -> Result<Response<clementine::Txid>, Status> {
374 if !cfg!(test) {
375 return Err(Status::permission_denied(
376 "This method is only available in tests",
377 ));
378 }
379
380 tracing::info!(
381 "Internal finalized payout rpc called with finalized payout params: {:?}",
382 request.get_ref()
383 );
384
385 let payout_blockhash: [u8; 32] = request
386 .get_ref()
387 .payout_blockhash
388 .clone()
389 .try_into()
390 .map_err(|e| {
391 Status::invalid_argument(format!(
392 "Failed to convert payout blockhash to [u8; 32]: {e:?}"
393 ))
394 })?;
395 let deposit_outpoint: OutPoint = request
396 .get_ref()
397 .deposit_outpoint
398 .clone()
399 .ok_or(Status::invalid_argument("Failed to get deposit outpoint"))?
400 .try_into()?;
401
402 let mut dbtx = self.operator.db.begin_transaction().await?;
403 let kickoff_txid = self
404 .operator
405 .handle_finalized_payout(
406 &mut dbtx,
407 deposit_outpoint,
408 BlockHash::from_byte_array(payout_blockhash),
409 )
410 .await?;
411 dbtx.commit()
412 .await
413 .wrap_err("Failed to commit transaction")
414 .map_to_status()?;
415
416 Ok(Response::new(kickoff_txid.into()))
417 }
418
419 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
420 async fn internal_end_round(
421 &self,
422 _request: Request<Empty>,
423 ) -> Result<Response<Empty>, Status> {
424 tracing::warn!("Internal end round rpc called");
425 #[cfg(feature = "automation")]
426 {
427 use eyre::Context;
428
429 let mut dbtx = self.operator.db.begin_transaction().await?;
430
431 self.operator.end_round(&mut dbtx).await?;
432
433 dbtx.commit()
434 .await
435 .wrap_err("Failed to commit transaction")
436 .map_to_status()?;
437 Ok(Response::new(Empty {}))
438 }
439
440 #[cfg(not(feature = "automation"))]
441 Err(Status::unimplemented(
442 "Automation is not enabled. Operator does not manage its rounds",
443 ))
444 }
445
446 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
447 async fn get_x_only_public_key(
448 &self,
449 _request: Request<Empty>,
450 ) -> Result<Response<XOnlyPublicKeyRpc>, Status> {
451 tracing::info!("Get xonly public key rpc called");
452 let xonly_pk = self.operator.signer.xonly_public_key.serialize();
453 Ok(Response::new(XOnlyPublicKeyRpc {
454 xonly_public_key: xonly_pk.to_vec(),
455 }))
456 }
457
458 async fn get_current_status(
459 &self,
460 _request: Request<Empty>,
461 ) -> Result<Response<clementine::EntityStatus>, Status> {
462 tracing::info!("Get current status rpc called");
463 let status = self.get_current_status().await?;
464 tracing::info!("Get current status rpc completed successfully");
465 Ok(Response::new(status))
466 }
467
468 async fn get_reimbursement_txs(
469 &self,
470 request: Request<clementine::Outpoint>,
471 ) -> Result<Response<SignedTxsWithType>, Status> {
472 let deposit_outpoint: OutPoint = request.into_inner().try_into()?;
473 tracing::warn!(
474 "Get reimbursement txs rpc called with deposit outpoint: {:?}",
475 deposit_outpoint
476 );
477 let txs = self
478 .operator
479 .get_reimbursement_txs(deposit_outpoint)
480 .await?;
481 Ok(Response::new(txs.into()))
482 }
483
484 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
485 async fn transfer_to_btc_wallet(
486 &self,
487 request: Request<clementine::Outpoints>,
488 ) -> Result<Response<RawSignedTx>, Status> {
489 let outpoints: Vec<OutPoint> = request
490 .into_inner()
491 .outpoints
492 .into_iter()
493 .map(TryInto::try_into)
494 .collect::<Result<Vec<_>, _>>()
495 .map_err(|e| Status::invalid_argument(format!("Invalid outpoint: {e}")))?;
496
497 if outpoints.is_empty() {
498 return Err(Status::invalid_argument("No outpoints provided"));
499 }
500
501 let mut inputs = Vec::with_capacity(outpoints.len());
502
503 for outpoint in outpoints {
504 tracing::info!(
505 "TransferToBtcWallet rpc called with outpoint: {:?}",
506 outpoint
507 );
508
509 let txout = self
510 .operator
511 .rpc
512 .get_txout_from_outpoint(&outpoint)
513 .await
514 .map_err(|e| Status::internal(format!("Failed to get txout: {e}")))?;
515
516 if txout.script_pubkey != self.operator.signer.address.script_pubkey() {
517 return Err(Status::invalid_argument(format!(
518 "Outpoint script_pubkey does not match operator's address. Expected: {:?}, Got: {:?}",
519 self.operator.signer.address.script_pubkey(),
520 txout.script_pubkey
521 )));
522 }
523
524 inputs.push((outpoint, txout));
525 }
526
527 let signed_tx = self
528 .operator
529 .transfer_outpoints_to_wallet(inputs)
530 .await
531 .map_err(|e| Status::internal(format!("Failed to send outpoints to wallet: {e}")))?;
532
533 tracing::info!("Successfully created transaction sending outpoints to wallet");
534 Ok(Response::new(RawSignedTx::from(&signed_tx)))
535 }
536}