clementine_core/rpc/
operator.rs

1use super::clementine::clementine_operator_server::ClementineOperator;
2use super::clementine::{
3    self, ChallengeAckDigest, DepositParams, DepositSignSession, Empty, FinalizedPayoutParams,
4    OperatorKeys, OperatorParams, SchnorrSig, SignedTxWithType, SignedTxsWithType,
5    TransactionRequest, VergenResponse, WithdrawParams, XOnlyPublicKeyRpc,
6};
7use super::error::*;
8use crate::bitvm_client::ClementineBitVMPublicKeys;
9use crate::builder::transaction::sign::{create_and_sign_txs, TransactionRequestData};
10use crate::builder::transaction::ContractContext;
11use crate::citrea::CitreaClientT;
12use crate::compatibility::ActorWithConfig;
13use crate::constants::{DEFAULT_CHANNEL_SIZE, RESTART_BACKGROUND_TASKS_TIMEOUT};
14use crate::deposit::DepositData;
15use crate::operator::OperatorServer;
16use crate::rpc::clementine::{CompatibilityParamsRpc, RawSignedTx, WithdrawParamsWithSig};
17use crate::rpc::ecdsa_verification_sig::{
18    recover_address_from_ecdsa_signature, OperatorWithdrawalMessage,
19};
20use crate::rpc::parser;
21use crate::utils::{get_vergen_response, monitor_standalone_task, timed_request};
22use alloy::primitives::PrimitiveSignature;
23use bitcoin::hashes::Hash;
24use bitcoin::{BlockHash, OutPoint};
25use bitvm::chunk::api::{NUM_HASH, NUM_PUBS, NUM_U256};
26use clementine_errors::BridgeError;
27use clementine_errors::ResultExt;
28use eyre::Context;
29use futures::TryFutureExt;
30use std::convert::TryInto;
31use std::str::FromStr;
32use tokio::sync::mpsc;
33use tokio_stream::wrappers::ReceiverStream;
34use tonic::{async_trait, Request, Response, Status};
35
36#[async_trait]
37impl<C> ClementineOperator for OperatorServer<C>
38where
39    C: CitreaClientT,
40{
41    type DepositSignStream = ReceiverStream<Result<SchnorrSig, Status>>;
42    type GetParamsStream = ReceiverStream<Result<OperatorParams, Status>>;
43
44    async fn get_compatibility_params(
45        &self,
46        _request: Request<Empty>,
47    ) -> Result<Response<CompatibilityParamsRpc>, Status> {
48        let params = self.operator.get_compatibility_params()?;
49        Ok(Response::new(params.try_into().map_to_status()?))
50    }
51
52    async fn vergen(&self, _request: Request<Empty>) -> Result<Response<VergenResponse>, Status> {
53        tracing::info!("Vergen rpc called");
54        Ok(Response::new(get_vergen_response()))
55    }
56
57    async fn restart_background_tasks(
58        &self,
59        _request: tonic::Request<super::Empty>,
60    ) -> std::result::Result<tonic::Response<super::Empty>, tonic::Status> {
61        tracing::info!("Restarting background tasks rpc called");
62        timed_request(
63            RESTART_BACKGROUND_TASKS_TIMEOUT,
64            "Restarting background tasks",
65            self.start_background_tasks(),
66        )
67        .await?;
68        tracing::info!("Restarting background tasks rpc completed");
69        Ok(Response::new(Empty {}))
70    }
71
72    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
73    async fn get_params(
74        &self,
75        _request: Request<Empty>,
76    ) -> Result<Response<Self::GetParamsStream>, Status> {
77        tracing::info!("Get params rpc called");
78        let operator = self.operator.clone();
79        let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
80        let out_stream: Self::GetParamsStream = ReceiverStream::new(rx);
81        let monitor_err_sender = tx.clone();
82
83        let (mut wpk_receiver, mut signature_receiver) = operator.get_params().await?;
84
85        let handle = tokio::spawn(async move {
86            let operator_config: OperatorParams = operator.clone().into();
87            tx.send(Ok(operator_config))
88                .await
89                .map_err(output_stream_ended_prematurely)?;
90
91            while let Some(winternitz_public_key) = wpk_receiver.recv().await {
92                let operator_winternitz_pubkey: OperatorParams = winternitz_public_key.into();
93                tx.send(Ok(operator_winternitz_pubkey))
94                    .await
95                    .map_err(output_stream_ended_prematurely)?;
96            }
97
98            while let Some(operator_sig) = signature_receiver.recv().await {
99                let unspent_kickoff_sig: OperatorParams = operator_sig.into();
100                tx.send(Ok(unspent_kickoff_sig))
101                    .await
102                    .map_err(output_stream_ended_prematurely)?;
103            }
104
105            Ok::<(), Status>(())
106        });
107        monitor_standalone_task(handle, "Operator get_params", monitor_err_sender);
108
109        Ok(Response::new(out_stream))
110    }
111
112    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
113    async fn deposit_sign(
114        &self,
115        request: Request<DepositSignSession>,
116    ) -> Result<Response<Self::DepositSignStream>, Status> {
117        tracing::info!("Deposit sign rpc called");
118        let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
119
120        let deposit_sign_session = request.into_inner();
121        let deposit_params: DepositParams = deposit_sign_session.try_into()?;
122        let deposit_data: DepositData = deposit_params.try_into()?;
123        tracing::info!(
124            "Parsed deposit sign rpc params, deposit data: {:?}",
125            deposit_data
126        );
127
128        let expected_sigs = self
129            .operator
130            .config
131            .get_num_required_operator_sigs(&deposit_data);
132
133        let mut deposit_signatures_rx = self.operator.deposit_sign(deposit_data).await?;
134        let monitor_err_sender = tx.clone();
135
136        let handle = tokio::spawn(async move {
137            let mut sent_sigs = 0;
138            while let Some(sig) = deposit_signatures_rx.recv().await {
139                let sig = sig?;
140                let operator_burn_sig = SchnorrSig {
141                    schnorr_sig: sig.serialize().to_vec(),
142                };
143
144                tx.send(Ok(operator_burn_sig))
145                    .inspect_ok(|_| {
146                        sent_sigs += 1;
147                        tracing::debug!(
148                            "Sent signature {}/{} in deposit_sign()",
149                            sent_sigs,
150                            expected_sigs
151                        );
152                    })
153                    .await
154                    .wrap_err("Failed to send signature in operator rpc deposit sign")
155                    .map_to_status()?;
156            }
157            Ok::<(), Status>(())
158        });
159
160        monitor_standalone_task(handle, "Operator deposit sign", monitor_err_sender);
161
162        Ok(Response::new(ReceiverStream::new(rx)))
163    }
164
165    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
166    async fn internal_withdraw(
167        &self,
168        request: Request<WithdrawParams>,
169    ) -> Result<Response<RawSignedTx>, Status> {
170        let (withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
171            parser::operator::parse_withdrawal_sig_params(request.into_inner())?;
172
173        tracing::warn!("Called internal_withdraw with withdrawal id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}", withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount);
174
175        let payout_tx = self
176            .operator
177            .withdraw(
178                withdrawal_id,
179                input_signature,
180                input_outpoint,
181                output_script_pubkey,
182                output_amount,
183            )
184            .await?;
185
186        Ok(Response::new(RawSignedTx::from(&payout_tx)))
187    }
188
189    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
190    async fn withdraw(
191        &self,
192        request: Request<WithdrawParamsWithSig>,
193    ) -> Result<Response<RawSignedTx>, Status> {
194        tracing::info!("Withdraw rpc called");
195        let params = request.into_inner();
196        let withdraw_params = params.withdrawal.ok_or(Status::invalid_argument(
197            "Withdrawal params not found for withdrawal",
198        ))?;
199        let (withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
200            parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
201
202        tracing::warn!(
203            "Parsed withdraw rpc params, withdrawal id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}, verification signature: {:?}", withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount, params.verification_signature
204        );
205
206        // if verification address is set in config, check if verification signature is valid
207        if let Some(address_in_config) = self.operator.config.aggregator_verification_address {
208            let verification_signature = params
209                .verification_signature
210                .map(|sig| {
211                    PrimitiveSignature::from_str(&sig).map_err(|e| {
212                        Status::invalid_argument(format!("Invalid verification signature: {e}"))
213                    })
214                })
215                .transpose()?;
216            // check if verification signature is provided by aggregator
217            if let Some(verification_signature) = verification_signature {
218                let address_from_sig =
219                    recover_address_from_ecdsa_signature::<OperatorWithdrawalMessage>(
220                        withdrawal_id,
221                        input_signature,
222                        input_outpoint,
223                        output_script_pubkey.clone(),
224                        output_amount,
225                        verification_signature,
226                    )?;
227
228                // check if verification signature is signed by the address in config
229                if address_from_sig != address_in_config {
230                    return Err(BridgeError::InvalidECDSAVerificationSignature).map_to_status();
231                }
232            } else {
233                // if verification signature is not provided, but verification address is set in config, return error
234                return Err(BridgeError::ECDSAVerificationSignatureMissing).map_to_status();
235            }
236        }
237
238        let payout_tx = self
239            .operator
240            .withdraw(
241                withdrawal_id,
242                input_signature,
243                input_outpoint,
244                output_script_pubkey,
245                output_amount,
246            )
247            .await?;
248
249        tracing::info!(
250            "Withdraw rpc completed successfully for withdrawal id: {:?}",
251            withdrawal_id
252        );
253
254        Ok(Response::new(RawSignedTx::from(&payout_tx)))
255    }
256
257    #[tracing::instrument(skip(self, request), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
258    async fn internal_create_assert_commitment_txs(
259        &self,
260        request: Request<TransactionRequest>,
261    ) -> std::result::Result<tonic::Response<super::SignedTxsWithType>, tonic::Status> {
262        let tx_req = request.into_inner();
263        let tx_req_data: TransactionRequestData = tx_req.try_into()?;
264        tracing::warn!(
265            "Called internal_create_assert_commitment_txs with transaction request data: {:?}",
266            tx_req_data
267        );
268        let raw_txs = self
269            .operator
270            .create_assert_commitment_txs(
271                tx_req_data,
272                ClementineBitVMPublicKeys::get_assert_commit_data(
273                    (
274                        [[0u8; 32]; NUM_PUBS],
275                        [[0u8; 32]; NUM_U256],
276                        [[0u8; 16]; NUM_HASH],
277                    ),
278                    &[0u8; 20],
279                ),
280                None,
281            )
282            .await?;
283
284        Ok(Response::new(SignedTxsWithType {
285            signed_txs: raw_txs
286                .into_iter()
287                .map(|(tx_type, signed_tx)| SignedTxWithType {
288                    transaction_type: Some(tx_type.into()),
289                    raw_tx: bitcoin::consensus::serialize(&signed_tx),
290                })
291                .collect(),
292        }))
293    }
294
295    #[tracing::instrument(skip(self, request), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
296    async fn get_deposit_keys(
297        &self,
298        request: Request<DepositParams>,
299    ) -> Result<Response<OperatorKeys>, Status> {
300        let start = std::time::Instant::now();
301        let deposit_params = request.into_inner();
302        let deposit_data: DepositData = deposit_params.try_into()?;
303        tracing::warn!(
304            "Called get_deposit_keys with deposit data: {:?}",
305            deposit_data
306        );
307        let winternitz_keys = self
308            .operator
309            .generate_assert_winternitz_pubkeys(deposit_data.get_deposit_outpoint())?;
310        let hashes = self
311            .operator
312            .generate_challenge_ack_preimages_and_hashes(&deposit_data)?;
313        tracing::info!("Generated deposit keys in {:?}", start.elapsed());
314
315        Ok(Response::new(OperatorKeys {
316            winternitz_pubkeys: winternitz_keys
317                .into_iter()
318                .map(|pubkey| pubkey.into())
319                .collect(),
320            challenge_ack_digests: hashes
321                .into_iter()
322                .map(|hash| ChallengeAckDigest { hash: hash.into() })
323                .collect(),
324        }))
325    }
326
327    async fn internal_create_signed_txs(
328        &self,
329        request: tonic::Request<super::TransactionRequest>,
330    ) -> std::result::Result<tonic::Response<super::SignedTxsWithType>, tonic::Status> {
331        let transaction_request = request.into_inner();
332        let transaction_data: TransactionRequestData = transaction_request.try_into()?;
333        tracing::warn!(
334            "Called internal_create_signed_txs with transaction request data: {:?}",
335            transaction_data
336        );
337        let (_, deposit_data) = self
338            .operator
339            .db
340            .get_deposit_data(None, transaction_data.deposit_outpoint)
341            .await?
342            .ok_or(Status::invalid_argument("Deposit not found in database"))?;
343        let context = ContractContext::new_context_for_kickoff(
344            transaction_data.kickoff_data,
345            deposit_data,
346            self.operator.config.protocol_paramset(),
347        );
348        let raw_txs = create_and_sign_txs(
349            self.operator.db.clone(),
350            &self.operator.signer,
351            self.operator.config.clone(),
352            context,
353            Some([0u8; 20]), // dummy blockhash
354            None,
355        )
356        .await?;
357
358        Ok(Response::new(SignedTxsWithType {
359            signed_txs: raw_txs
360                .into_iter()
361                .map(|(tx_type, signed_tx)| SignedTxWithType {
362                    transaction_type: Some(tx_type.into()),
363                    raw_tx: bitcoin::consensus::serialize(&signed_tx),
364                })
365                .collect(),
366        }))
367    }
368
369    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
370    async fn internal_finalized_payout(
371        &self,
372        request: Request<FinalizedPayoutParams>,
373    ) -> Result<Response<clementine::Txid>, Status> {
374        if !cfg!(test) {
375            return Err(Status::permission_denied(
376                "This method is only available in tests",
377            ));
378        }
379
380        tracing::info!(
381            "Internal finalized payout rpc called with finalized payout params: {:?}",
382            request.get_ref()
383        );
384
385        let payout_blockhash: [u8; 32] = request
386            .get_ref()
387            .payout_blockhash
388            .clone()
389            .try_into()
390            .map_err(|e| {
391                Status::invalid_argument(format!(
392                    "Failed to convert payout blockhash to [u8; 32]: {e:?}"
393                ))
394            })?;
395        let deposit_outpoint: OutPoint = request
396            .get_ref()
397            .deposit_outpoint
398            .clone()
399            .ok_or(Status::invalid_argument("Failed to get deposit outpoint"))?
400            .try_into()?;
401
402        let mut dbtx = self.operator.db.begin_transaction().await?;
403        let kickoff_txid = self
404            .operator
405            .handle_finalized_payout(
406                &mut dbtx,
407                deposit_outpoint,
408                BlockHash::from_byte_array(payout_blockhash),
409            )
410            .await?;
411        dbtx.commit()
412            .await
413            .wrap_err("Failed to commit transaction")
414            .map_to_status()?;
415
416        Ok(Response::new(kickoff_txid.into()))
417    }
418
419    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
420    async fn internal_end_round(
421        &self,
422        _request: Request<Empty>,
423    ) -> Result<Response<Empty>, Status> {
424        tracing::warn!("Internal end round rpc called");
425        #[cfg(feature = "automation")]
426        {
427            use eyre::Context;
428
429            let mut dbtx = self.operator.db.begin_transaction().await?;
430
431            self.operator.end_round(&mut dbtx).await?;
432
433            dbtx.commit()
434                .await
435                .wrap_err("Failed to commit transaction")
436                .map_to_status()?;
437            Ok(Response::new(Empty {}))
438        }
439
440        #[cfg(not(feature = "automation"))]
441        Err(Status::unimplemented(
442            "Automation is not enabled. Operator does not manage its rounds",
443        ))
444    }
445
446    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
447    async fn get_x_only_public_key(
448        &self,
449        _request: Request<Empty>,
450    ) -> Result<Response<XOnlyPublicKeyRpc>, Status> {
451        tracing::info!("Get xonly public key rpc called");
452        let xonly_pk = self.operator.signer.xonly_public_key.serialize();
453        Ok(Response::new(XOnlyPublicKeyRpc {
454            xonly_public_key: xonly_pk.to_vec(),
455        }))
456    }
457
458    async fn get_current_status(
459        &self,
460        _request: Request<Empty>,
461    ) -> Result<Response<clementine::EntityStatus>, Status> {
462        tracing::info!("Get current status rpc called");
463        let status = self.get_current_status().await?;
464        tracing::info!("Get current status rpc completed successfully");
465        Ok(Response::new(status))
466    }
467
468    async fn get_reimbursement_txs(
469        &self,
470        request: Request<clementine::Outpoint>,
471    ) -> Result<Response<SignedTxsWithType>, Status> {
472        let deposit_outpoint: OutPoint = request.into_inner().try_into()?;
473        tracing::warn!(
474            "Get reimbursement txs rpc called with deposit outpoint: {:?}",
475            deposit_outpoint
476        );
477        let txs = self
478            .operator
479            .get_reimbursement_txs(deposit_outpoint)
480            .await?;
481        Ok(Response::new(txs.into()))
482    }
483
484    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
485    async fn transfer_to_btc_wallet(
486        &self,
487        request: Request<clementine::Outpoints>,
488    ) -> Result<Response<RawSignedTx>, Status> {
489        let outpoints: Vec<OutPoint> = request
490            .into_inner()
491            .outpoints
492            .into_iter()
493            .map(TryInto::try_into)
494            .collect::<Result<Vec<_>, _>>()
495            .map_err(|e| Status::invalid_argument(format!("Invalid outpoint: {e}")))?;
496
497        if outpoints.is_empty() {
498            return Err(Status::invalid_argument("No outpoints provided"));
499        }
500
501        let mut inputs = Vec::with_capacity(outpoints.len());
502
503        for outpoint in outpoints {
504            tracing::info!(
505                "TransferToBtcWallet rpc called with outpoint: {:?}",
506                outpoint
507            );
508
509            let txout = self
510                .operator
511                .rpc
512                .get_txout_from_outpoint(&outpoint)
513                .await
514                .map_err(|e| Status::internal(format!("Failed to get txout: {e}")))?;
515
516            if txout.script_pubkey != self.operator.signer.address.script_pubkey() {
517                return Err(Status::invalid_argument(format!(
518                    "Outpoint script_pubkey does not match operator's address. Expected: {:?}, Got: {:?}",
519                    self.operator.signer.address.script_pubkey(),
520                    txout.script_pubkey
521                )));
522            }
523
524            inputs.push((outpoint, txout));
525        }
526
527        let signed_tx = self
528            .operator
529            .transfer_outpoints_to_wallet(inputs)
530            .await
531            .map_err(|e| Status::internal(format!("Failed to send outpoints to wallet: {e}")))?;
532
533        tracing::info!("Successfully created transaction sending outpoints to wallet");
534        Ok(Response::new(RawSignedTx::from(&signed_tx)))
535    }
536}