clementine_core/rpc/
operator.rs

1use super::clementine::clementine_operator_server::ClementineOperator;
2use super::clementine::{
3    self, ChallengeAckDigest, DepositParams, DepositSignSession, Empty, FinalizedPayoutParams,
4    OperatorKeys, OperatorParams, SchnorrSig, SignedTxWithType, SignedTxsWithType,
5    TransactionRequest, VergenResponse, WithdrawParams, XOnlyPublicKeyRpc,
6};
7use super::error::*;
8use crate::bitvm_client::ClementineBitVMPublicKeys;
9use crate::builder::transaction::sign::{create_and_sign_txs, TransactionRequestData};
10use crate::builder::transaction::ContractContext;
11use crate::citrea::CitreaClientT;
12use crate::compatibility::ActorWithConfig;
13use crate::constants::{DEFAULT_CHANNEL_SIZE, RESTART_BACKGROUND_TASKS_TIMEOUT};
14use crate::deposit::DepositData;
15use crate::operator::OperatorServer;
16use crate::rpc::clementine::{CompatibilityParamsRpc, RawSignedTx, WithdrawParamsWithSig};
17use crate::rpc::ecdsa_verification_sig::{
18    recover_address_from_ecdsa_signature, OperatorWithdrawalMessage,
19};
20use crate::rpc::parser;
21use crate::utils::{get_vergen_response, monitor_standalone_task, timed_request};
22use alloy::primitives::PrimitiveSignature;
23use bitcoin::hashes::Hash;
24use bitcoin::{BlockHash, OutPoint};
25use bitvm::chunk::api::{NUM_HASH, NUM_PUBS, NUM_U256};
26use clementine_errors::BridgeError;
27use clementine_errors::ResultExt;
28use eyre::Context;
29use futures::TryFutureExt;
30use std::convert::TryInto;
31use std::str::FromStr;
32use tokio::sync::mpsc;
33use tokio_stream::wrappers::ReceiverStream;
34use tonic::{async_trait, Request, Response, Status};
35
36#[async_trait]
37impl<C> ClementineOperator for OperatorServer<C>
38where
39    C: CitreaClientT,
40{
41    type DepositSignStream = ReceiverStream<Result<SchnorrSig, Status>>;
42    type GetParamsStream = ReceiverStream<Result<OperatorParams, Status>>;
43
44    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
45    async fn get_compatibility_params(
46        &self,
47        _request: Request<Empty>,
48    ) -> Result<Response<CompatibilityParamsRpc>, Status> {
49        let params = self.operator.get_compatibility_params()?;
50        Ok(Response::new(params.try_into().map_to_status()?))
51    }
52
53    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
54    async fn vergen(&self, _request: Request<Empty>) -> Result<Response<VergenResponse>, Status> {
55        tracing::info!("Vergen rpc called");
56        Ok(Response::new(get_vergen_response()))
57    }
58
59    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
60    async fn restart_background_tasks(
61        &self,
62        _request: tonic::Request<super::Empty>,
63    ) -> std::result::Result<tonic::Response<super::Empty>, tonic::Status> {
64        tracing::info!("Restarting background tasks rpc called");
65        timed_request(
66            RESTART_BACKGROUND_TASKS_TIMEOUT,
67            "Restarting background tasks",
68            self.start_background_tasks(),
69        )
70        .await?;
71        tracing::info!("Restarting background tasks rpc completed");
72        Ok(Response::new(Empty {}))
73    }
74
75    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
76    async fn get_params(
77        &self,
78        _request: Request<Empty>,
79    ) -> Result<Response<Self::GetParamsStream>, Status> {
80        tracing::info!("Get params rpc called");
81        let operator = self.operator.clone();
82        let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
83        let out_stream: Self::GetParamsStream = ReceiverStream::new(rx);
84        let monitor_err_sender = tx.clone();
85
86        let (mut wpk_receiver, mut signature_receiver) = operator.get_params().await?;
87
88        let handle = tokio::spawn(async move {
89            let operator_config: OperatorParams = operator.clone().into();
90            tx.send(Ok(operator_config))
91                .await
92                .map_err(output_stream_ended_prematurely)?;
93
94            while let Some(winternitz_public_key) = wpk_receiver.recv().await {
95                let operator_winternitz_pubkey: OperatorParams = winternitz_public_key.into();
96                tx.send(Ok(operator_winternitz_pubkey))
97                    .await
98                    .map_err(output_stream_ended_prematurely)?;
99            }
100
101            while let Some(operator_sig) = signature_receiver.recv().await {
102                let unspent_kickoff_sig: OperatorParams = operator_sig.into();
103                tx.send(Ok(unspent_kickoff_sig))
104                    .await
105                    .map_err(output_stream_ended_prematurely)?;
106            }
107
108            Ok::<(), Status>(())
109        });
110        monitor_standalone_task(handle, "Operator get_params", monitor_err_sender);
111
112        Ok(Response::new(out_stream))
113    }
114
115    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
116    async fn deposit_sign(
117        &self,
118        request: Request<DepositSignSession>,
119    ) -> Result<Response<Self::DepositSignStream>, Status> {
120        tracing::info!("Deposit sign rpc called");
121        let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
122
123        let deposit_sign_session = request.into_inner();
124        let deposit_params: DepositParams = deposit_sign_session.try_into()?;
125        let deposit_data: DepositData = deposit_params.try_into()?;
126        tracing::info!(
127            "Parsed deposit sign rpc params, deposit data: {:?}",
128            deposit_data
129        );
130
131        let expected_sigs = self
132            .operator
133            .config
134            .get_num_required_operator_sigs(&deposit_data);
135
136        let mut deposit_signatures_rx = self.operator.deposit_sign(deposit_data).await?;
137        let monitor_err_sender = tx.clone();
138
139        let handle = tokio::spawn(async move {
140            let mut sent_sigs = 0;
141            while let Some(sig) = deposit_signatures_rx.recv().await {
142                let sig = sig?;
143                let operator_burn_sig = SchnorrSig {
144                    schnorr_sig: sig.serialize().to_vec(),
145                };
146
147                tx.send(Ok(operator_burn_sig))
148                    .inspect_ok(|_| {
149                        sent_sigs += 1;
150                        tracing::debug!(
151                            "Sent signature {}/{} in deposit_sign()",
152                            sent_sigs,
153                            expected_sigs
154                        );
155                    })
156                    .await
157                    .wrap_err("Failed to send signature in operator rpc deposit sign")
158                    .map_to_status()?;
159            }
160            Ok::<(), Status>(())
161        });
162
163        monitor_standalone_task(handle, "Operator deposit sign", monitor_err_sender);
164
165        Ok(Response::new(ReceiverStream::new(rx)))
166    }
167
168    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
169    async fn internal_withdraw(
170        &self,
171        request: Request<WithdrawParams>,
172    ) -> Result<Response<RawSignedTx>, Status> {
173        let (withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
174            parser::operator::parse_withdrawal_sig_params(request.into_inner())?;
175
176        tracing::warn!("Called internal_withdraw with withdrawal id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}", withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount);
177
178        let payout_tx = self
179            .operator
180            .withdraw(
181                withdrawal_id,
182                input_signature,
183                input_outpoint,
184                output_script_pubkey,
185                output_amount,
186            )
187            .await?;
188
189        Ok(Response::new(RawSignedTx::from(&payout_tx)))
190    }
191
192    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
193    async fn withdraw(
194        &self,
195        request: Request<WithdrawParamsWithSig>,
196    ) -> Result<Response<RawSignedTx>, Status> {
197        tracing::info!("Withdraw rpc called");
198        let params = request.into_inner();
199        let withdraw_params = params.withdrawal.ok_or(Status::invalid_argument(
200            "Withdrawal params not found for withdrawal",
201        ))?;
202        let (withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
203            parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
204
205        tracing::warn!(
206            "Parsed withdraw rpc params, withdrawal id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}, verification signature: {:?}", withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount, params.verification_signature
207        );
208
209        // if verification address is set in config, check if verification signature is valid
210        if let Some(address_in_config) = self.operator.config.aggregator_verification_address {
211            let verification_signature = params
212                .verification_signature
213                .map(|sig| {
214                    PrimitiveSignature::from_str(&sig).map_err(|e| {
215                        Status::invalid_argument(format!("Invalid verification signature: {e}"))
216                    })
217                })
218                .transpose()?;
219            // check if verification signature is provided by aggregator
220            if let Some(verification_signature) = verification_signature {
221                let address_from_sig =
222                    recover_address_from_ecdsa_signature::<OperatorWithdrawalMessage>(
223                        withdrawal_id,
224                        input_signature,
225                        input_outpoint,
226                        output_script_pubkey.clone(),
227                        output_amount,
228                        verification_signature,
229                    )?;
230
231                // check if verification signature is signed by the address in config
232                if address_from_sig != address_in_config {
233                    return Err(BridgeError::InvalidECDSAVerificationSignature).map_to_status();
234                }
235            } else {
236                // if verification signature is not provided, but verification address is set in config, return error
237                return Err(BridgeError::ECDSAVerificationSignatureMissing).map_to_status();
238            }
239        }
240
241        let payout_tx = self
242            .operator
243            .withdraw(
244                withdrawal_id,
245                input_signature,
246                input_outpoint,
247                output_script_pubkey,
248                output_amount,
249            )
250            .await?;
251
252        tracing::info!(
253            "Withdraw rpc completed successfully for withdrawal id: {:?}",
254            withdrawal_id
255        );
256
257        Ok(Response::new(RawSignedTx::from(&payout_tx)))
258    }
259
260    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
261    async fn internal_create_assert_commitment_txs(
262        &self,
263        request: Request<TransactionRequest>,
264    ) -> std::result::Result<tonic::Response<super::SignedTxsWithType>, tonic::Status> {
265        let tx_req = request.into_inner();
266        let tx_req_data: TransactionRequestData = tx_req.try_into()?;
267        tracing::warn!(
268            "Called internal_create_assert_commitment_txs with transaction request data: {:?}",
269            tx_req_data
270        );
271        let raw_txs = self
272            .operator
273            .create_assert_commitment_txs(
274                tx_req_data,
275                ClementineBitVMPublicKeys::get_assert_commit_data(
276                    (
277                        [[0u8; 32]; NUM_PUBS],
278                        [[0u8; 32]; NUM_U256],
279                        [[0u8; 16]; NUM_HASH],
280                    ),
281                    &[0u8; 20],
282                ),
283                None,
284            )
285            .await?;
286
287        Ok(Response::new(SignedTxsWithType {
288            signed_txs: raw_txs
289                .into_iter()
290                .map(|(tx_type, signed_tx)| SignedTxWithType {
291                    transaction_type: Some(tx_type.into()),
292                    raw_tx: bitcoin::consensus::serialize(&signed_tx),
293                })
294                .collect(),
295        }))
296    }
297
298    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
299    async fn get_deposit_keys(
300        &self,
301        request: Request<DepositParams>,
302    ) -> Result<Response<OperatorKeys>, Status> {
303        let start = std::time::Instant::now();
304        let deposit_params = request.into_inner();
305        let deposit_data: DepositData = deposit_params.try_into()?;
306        tracing::info!(
307            "Called get_deposit_keys with deposit data: {:?}",
308            deposit_data
309        );
310        let winternitz_keys = self
311            .operator
312            .generate_assert_winternitz_pubkeys(deposit_data.get_deposit_outpoint())?;
313        let hashes = self
314            .operator
315            .generate_challenge_ack_preimages_and_hashes(&deposit_data)?;
316        tracing::info!("Generated deposit keys in {:?}", start.elapsed());
317
318        Ok(Response::new(OperatorKeys {
319            winternitz_pubkeys: winternitz_keys
320                .into_iter()
321                .map(|pubkey| pubkey.into())
322                .collect(),
323            challenge_ack_digests: hashes
324                .into_iter()
325                .map(|hash| ChallengeAckDigest { hash: hash.into() })
326                .collect(),
327        }))
328    }
329
330    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
331    async fn internal_create_signed_txs(
332        &self,
333        request: tonic::Request<super::TransactionRequest>,
334    ) -> std::result::Result<tonic::Response<super::SignedTxsWithType>, tonic::Status> {
335        let transaction_request = request.into_inner();
336        let transaction_data: TransactionRequestData = transaction_request.try_into()?;
337        tracing::warn!(
338            "Called internal_create_signed_txs with transaction request data: {:?}",
339            transaction_data
340        );
341        let (_, deposit_data) = self
342            .operator
343            .db
344            .get_deposit_data(None, transaction_data.deposit_outpoint)
345            .await?
346            .ok_or(Status::invalid_argument("Deposit not found in database"))?;
347        let context = ContractContext::new_context_for_kickoff(
348            transaction_data.kickoff_data,
349            deposit_data,
350            self.operator.config.protocol_paramset(),
351        );
352        let raw_txs = create_and_sign_txs(
353            self.operator.db.clone(),
354            &self.operator.signer,
355            self.operator.config.clone(),
356            context,
357            Some([0u8; 20]), // dummy blockhash
358            None,
359        )
360        .await?;
361
362        Ok(Response::new(SignedTxsWithType {
363            signed_txs: raw_txs
364                .into_iter()
365                .map(|(tx_type, signed_tx)| SignedTxWithType {
366                    transaction_type: Some(tx_type.into()),
367                    raw_tx: bitcoin::consensus::serialize(&signed_tx),
368                })
369                .collect(),
370        }))
371    }
372
373    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
374    async fn internal_finalized_payout(
375        &self,
376        request: Request<FinalizedPayoutParams>,
377    ) -> Result<Response<clementine::Txid>, Status> {
378        if !cfg!(test) {
379            return Err(Status::permission_denied(
380                "This method is only available in tests",
381            ));
382        }
383
384        tracing::info!(
385            "Internal finalized payout rpc called with finalized payout params: {:?}",
386            request.get_ref()
387        );
388
389        let payout_blockhash: [u8; 32] = request
390            .get_ref()
391            .payout_blockhash
392            .clone()
393            .try_into()
394            .map_err(|e| {
395                Status::invalid_argument(format!(
396                    "Failed to convert payout blockhash to [u8; 32]: {e:?}"
397                ))
398            })?;
399        let deposit_outpoint: OutPoint = request
400            .get_ref()
401            .deposit_outpoint
402            .clone()
403            .ok_or(Status::invalid_argument("Failed to get deposit outpoint"))?
404            .try_into()?;
405
406        let mut dbtx = self.operator.db.begin_transaction().await?;
407        let kickoff_txid = self
408            .operator
409            .handle_finalized_payout(
410                &mut dbtx,
411                deposit_outpoint,
412                BlockHash::from_byte_array(payout_blockhash),
413            )
414            .await?;
415        dbtx.commit()
416            .await
417            .wrap_err("Failed to commit transaction")
418            .map_to_status()?;
419
420        Ok(Response::new(kickoff_txid.into()))
421    }
422
423    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
424    async fn internal_end_round(
425        &self,
426        _request: Request<Empty>,
427    ) -> Result<Response<Empty>, Status> {
428        tracing::warn!("Internal end round rpc called");
429        #[cfg(feature = "automation")]
430        {
431            use eyre::Context;
432
433            let mut dbtx = self.operator.db.begin_transaction().await?;
434
435            self.operator.end_round(&mut dbtx).await?;
436
437            dbtx.commit()
438                .await
439                .wrap_err("Failed to commit transaction")
440                .map_to_status()?;
441            Ok(Response::new(Empty {}))
442        }
443
444        #[cfg(not(feature = "automation"))]
445        Err(Status::unimplemented(
446            "Automation is not enabled. Operator does not manage its rounds",
447        ))
448    }
449
450    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
451    async fn get_x_only_public_key(
452        &self,
453        _request: Request<Empty>,
454    ) -> Result<Response<XOnlyPublicKeyRpc>, Status> {
455        tracing::info!("Get xonly public key rpc called");
456        let xonly_pk = self.operator.signer.xonly_public_key.serialize();
457        Ok(Response::new(XOnlyPublicKeyRpc {
458            xonly_public_key: xonly_pk.to_vec(),
459        }))
460    }
461
462    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
463    async fn get_current_status(
464        &self,
465        _request: Request<Empty>,
466    ) -> Result<Response<clementine::EntityStatus>, Status> {
467        tracing::debug!("Get current status rpc called");
468        let status = self.get_current_status().await?;
469        tracing::debug!("Get current status rpc completed successfully");
470        Ok(Response::new(status))
471    }
472
473    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
474    async fn get_reimbursement_txs(
475        &self,
476        request: Request<clementine::Outpoint>,
477    ) -> Result<Response<SignedTxsWithType>, Status> {
478        let deposit_outpoint: OutPoint = request.into_inner().try_into()?;
479        tracing::warn!(
480            "Get reimbursement txs rpc called with deposit outpoint: {:?}",
481            deposit_outpoint
482        );
483        let txs = self
484            .operator
485            .get_reimbursement_txs(deposit_outpoint)
486            .await?;
487        Ok(Response::new(txs.into()))
488    }
489
490    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
491    async fn transfer_to_btc_wallet(
492        &self,
493        request: Request<clementine::Outpoints>,
494    ) -> Result<Response<RawSignedTx>, Status> {
495        let outpoints: Vec<OutPoint> = request
496            .into_inner()
497            .outpoints
498            .into_iter()
499            .map(TryInto::try_into)
500            .collect::<Result<Vec<_>, _>>()
501            .map_err(|e| Status::invalid_argument(format!("Invalid outpoint: {e}")))?;
502
503        if outpoints.is_empty() {
504            return Err(Status::invalid_argument("No outpoints provided"));
505        }
506
507        let mut inputs = Vec::with_capacity(outpoints.len());
508
509        for outpoint in outpoints {
510            tracing::info!(
511                "TransferToBtcWallet rpc called with outpoint: {:?}",
512                outpoint
513            );
514
515            let txout = self
516                .operator
517                .rpc
518                .get_txout_from_outpoint(&outpoint)
519                .await
520                .map_err(|e| Status::internal(format!("Failed to get txout: {e}")))?;
521
522            if txout.script_pubkey != self.operator.signer.address.script_pubkey() {
523                return Err(Status::invalid_argument(format!(
524                    "Outpoint script_pubkey does not match operator's address. Expected: {:?}, Got: {:?}",
525                    self.operator.signer.address.script_pubkey(),
526                    txout.script_pubkey
527                )));
528            }
529
530            inputs.push((outpoint, txout));
531        }
532
533        let signed_tx = self
534            .operator
535            .transfer_outpoints_to_wallet(inputs)
536            .await
537            .map_err(|e| Status::internal(format!("Failed to send outpoints to wallet: {e}")))?;
538
539        tracing::info!("Successfully created transaction sending outpoints to wallet");
540        Ok(Response::new(RawSignedTx::from(&signed_tx)))
541    }
542}