clementine_core/rpc/
operator.rs

1use super::clementine::clementine_operator_server::ClementineOperator;
2use super::clementine::{
3    self, ChallengeAckDigest, DepositParams, DepositSignSession, Empty, FinalizedPayoutParams,
4    OperatorKeys, OperatorParams, SchnorrSig, SignedTxWithType, SignedTxsWithType,
5    TransactionRequest, VergenResponse, WithdrawParams, XOnlyPublicKeyRpc,
6};
7use super::error::*;
8use crate::bitvm_client::ClementineBitVMPublicKeys;
9use crate::builder::transaction::sign::{create_and_sign_txs, TransactionRequestData};
10use crate::builder::transaction::ContractContext;
11use crate::citrea::CitreaClientT;
12use crate::compatibility::ActorWithConfig;
13use crate::constants::{DEFAULT_CHANNEL_SIZE, RESTART_BACKGROUND_TASKS_TIMEOUT};
14use crate::deposit::DepositData;
15use crate::errors::BridgeError;
16use crate::errors::ResultExt;
17use crate::operator::OperatorServer;
18use crate::rpc::clementine::{CompatibilityParamsRpc, RawSignedTx, WithdrawParamsWithSig};
19use crate::rpc::ecdsa_verification_sig::{
20    recover_address_from_ecdsa_signature, OperatorWithdrawalMessage,
21};
22use crate::rpc::parser;
23use crate::utils::{get_vergen_response, monitor_standalone_task, timed_request};
24use alloy::primitives::PrimitiveSignature;
25use bitcoin::hashes::Hash;
26use bitcoin::{BlockHash, OutPoint};
27use bitvm::chunk::api::{NUM_HASH, NUM_PUBS, NUM_U256};
28use eyre::Context;
29use futures::TryFutureExt;
30use std::str::FromStr;
31use tokio::sync::mpsc;
32use tokio_stream::wrappers::ReceiverStream;
33use tonic::{async_trait, Request, Response, Status};
34
35#[async_trait]
36impl<C> ClementineOperator for OperatorServer<C>
37where
38    C: CitreaClientT,
39{
40    type DepositSignStream = ReceiverStream<Result<SchnorrSig, Status>>;
41    type GetParamsStream = ReceiverStream<Result<OperatorParams, Status>>;
42
43    async fn get_compatibility_params(
44        &self,
45        _request: Request<Empty>,
46    ) -> Result<Response<CompatibilityParamsRpc>, Status> {
47        let params = self.operator.get_compatibility_params()?;
48        Ok(Response::new(params.try_into().map_to_status()?))
49    }
50
51    async fn vergen(&self, _request: Request<Empty>) -> Result<Response<VergenResponse>, Status> {
52        tracing::info!("Vergen rpc called");
53        Ok(Response::new(get_vergen_response()))
54    }
55
56    async fn restart_background_tasks(
57        &self,
58        _request: tonic::Request<super::Empty>,
59    ) -> std::result::Result<tonic::Response<super::Empty>, tonic::Status> {
60        tracing::info!("Restarting background tasks rpc called");
61        timed_request(
62            RESTART_BACKGROUND_TASKS_TIMEOUT,
63            "Restarting background tasks",
64            self.start_background_tasks(),
65        )
66        .await?;
67        tracing::info!("Restarting background tasks rpc completed");
68        Ok(Response::new(Empty {}))
69    }
70
71    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
72    async fn get_params(
73        &self,
74        _request: Request<Empty>,
75    ) -> Result<Response<Self::GetParamsStream>, Status> {
76        tracing::info!("Get params rpc called");
77        let operator = self.operator.clone();
78        let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
79        let out_stream: Self::GetParamsStream = ReceiverStream::new(rx);
80        let monitor_err_sender = tx.clone();
81
82        let (mut wpk_receiver, mut signature_receiver) = operator.get_params().await?;
83
84        let handle = tokio::spawn(async move {
85            let operator_config: OperatorParams = operator.clone().into();
86            tx.send(Ok(operator_config))
87                .await
88                .map_err(output_stream_ended_prematurely)?;
89
90            while let Some(winternitz_public_key) = wpk_receiver.recv().await {
91                let operator_winternitz_pubkey: OperatorParams = winternitz_public_key.into();
92                tx.send(Ok(operator_winternitz_pubkey))
93                    .await
94                    .map_err(output_stream_ended_prematurely)?;
95            }
96
97            while let Some(operator_sig) = signature_receiver.recv().await {
98                let unspent_kickoff_sig: OperatorParams = operator_sig.into();
99                tx.send(Ok(unspent_kickoff_sig))
100                    .await
101                    .map_err(output_stream_ended_prematurely)?;
102            }
103
104            Ok::<(), Status>(())
105        });
106        monitor_standalone_task(handle, "Operator get_params", monitor_err_sender);
107
108        Ok(Response::new(out_stream))
109    }
110
111    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
112    async fn deposit_sign(
113        &self,
114        request: Request<DepositSignSession>,
115    ) -> Result<Response<Self::DepositSignStream>, Status> {
116        tracing::info!("Deposit sign rpc called");
117        let (tx, rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE);
118
119        let deposit_sign_session = request.into_inner();
120        let deposit_params: DepositParams = deposit_sign_session.try_into()?;
121        let deposit_data: DepositData = deposit_params.try_into()?;
122        tracing::info!(
123            "Parsed deposit sign rpc params, deposit data: {:?}",
124            deposit_data
125        );
126
127        let expected_sigs = self
128            .operator
129            .config
130            .get_num_required_operator_sigs(&deposit_data);
131
132        let mut deposit_signatures_rx = self.operator.deposit_sign(deposit_data).await?;
133        let monitor_err_sender = tx.clone();
134
135        let handle = tokio::spawn(async move {
136            let mut sent_sigs = 0;
137            while let Some(sig) = deposit_signatures_rx.recv().await {
138                let sig = sig?;
139                let operator_burn_sig = SchnorrSig {
140                    schnorr_sig: sig.serialize().to_vec(),
141                };
142
143                tx.send(Ok(operator_burn_sig))
144                    .inspect_ok(|_| {
145                        sent_sigs += 1;
146                        tracing::debug!(
147                            "Sent signature {}/{} in deposit_sign()",
148                            sent_sigs,
149                            expected_sigs
150                        );
151                    })
152                    .await
153                    .wrap_err("Failed to send signature in operator rpc deposit sign")
154                    .map_to_status()?;
155            }
156            Ok::<(), Status>(())
157        });
158
159        monitor_standalone_task(handle, "Operator deposit sign", monitor_err_sender);
160
161        Ok(Response::new(ReceiverStream::new(rx)))
162    }
163
164    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
165    async fn internal_withdraw(
166        &self,
167        request: Request<WithdrawParams>,
168    ) -> Result<Response<RawSignedTx>, Status> {
169        let (withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
170            parser::operator::parse_withdrawal_sig_params(request.into_inner())?;
171
172        tracing::warn!("Called internal_withdraw with withdrawal id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}", withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount);
173
174        let payout_tx = self
175            .operator
176            .withdraw(
177                withdrawal_id,
178                input_signature,
179                input_outpoint,
180                output_script_pubkey,
181                output_amount,
182            )
183            .await?;
184
185        Ok(Response::new(RawSignedTx::from(&payout_tx)))
186    }
187
188    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
189    async fn withdraw(
190        &self,
191        request: Request<WithdrawParamsWithSig>,
192    ) -> Result<Response<RawSignedTx>, Status> {
193        tracing::info!("Withdraw rpc called");
194        let params = request.into_inner();
195        let withdraw_params = params.withdrawal.ok_or(Status::invalid_argument(
196            "Withdrawal params not found for withdrawal",
197        ))?;
198        let (withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
199            parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
200
201        tracing::warn!(
202            "Parsed withdraw rpc params, withdrawal id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}, verification signature: {:?}", withdrawal_id, input_signature, input_outpoint, output_script_pubkey, output_amount, params.verification_signature
203        );
204
205        // if verification address is set in config, check if verification signature is valid
206        if let Some(address_in_config) = self.operator.config.aggregator_verification_address {
207            let verification_signature = params
208                .verification_signature
209                .map(|sig| {
210                    PrimitiveSignature::from_str(&sig).map_err(|e| {
211                        Status::invalid_argument(format!("Invalid verification signature: {e}"))
212                    })
213                })
214                .transpose()?;
215            // check if verification signature is provided by aggregator
216            if let Some(verification_signature) = verification_signature {
217                let address_from_sig =
218                    recover_address_from_ecdsa_signature::<OperatorWithdrawalMessage>(
219                        withdrawal_id,
220                        input_signature,
221                        input_outpoint,
222                        output_script_pubkey.clone(),
223                        output_amount,
224                        verification_signature,
225                    )?;
226
227                // check if verification signature is signed by the address in config
228                if address_from_sig != address_in_config {
229                    return Err(BridgeError::InvalidECDSAVerificationSignature).map_to_status();
230                }
231            } else {
232                // if verification signature is not provided, but verification address is set in config, return error
233                return Err(BridgeError::ECDSAVerificationSignatureMissing).map_to_status();
234            }
235        }
236
237        let payout_tx = self
238            .operator
239            .withdraw(
240                withdrawal_id,
241                input_signature,
242                input_outpoint,
243                output_script_pubkey,
244                output_amount,
245            )
246            .await?;
247
248        tracing::info!(
249            "Withdraw rpc completed successfully for withdrawal id: {:?}",
250            withdrawal_id
251        );
252
253        Ok(Response::new(RawSignedTx::from(&payout_tx)))
254    }
255
256    #[tracing::instrument(skip(self, request), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
257    async fn internal_create_assert_commitment_txs(
258        &self,
259        request: Request<TransactionRequest>,
260    ) -> std::result::Result<tonic::Response<super::SignedTxsWithType>, tonic::Status> {
261        let tx_req = request.into_inner();
262        let tx_req_data: TransactionRequestData = tx_req.try_into()?;
263        tracing::warn!(
264            "Called internal_create_assert_commitment_txs with transaction request data: {:?}",
265            tx_req_data
266        );
267        let raw_txs = self
268            .operator
269            .create_assert_commitment_txs(
270                tx_req_data,
271                ClementineBitVMPublicKeys::get_assert_commit_data(
272                    (
273                        [[0u8; 32]; NUM_PUBS],
274                        [[0u8; 32]; NUM_U256],
275                        [[0u8; 16]; NUM_HASH],
276                    ),
277                    &[0u8; 20],
278                ),
279                None,
280            )
281            .await?;
282
283        Ok(Response::new(SignedTxsWithType {
284            signed_txs: raw_txs
285                .into_iter()
286                .map(|(tx_type, signed_tx)| SignedTxWithType {
287                    transaction_type: Some(tx_type.into()),
288                    raw_tx: bitcoin::consensus::serialize(&signed_tx),
289                })
290                .collect(),
291        }))
292    }
293
294    #[tracing::instrument(skip(self, request), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
295    async fn get_deposit_keys(
296        &self,
297        request: Request<DepositParams>,
298    ) -> Result<Response<OperatorKeys>, Status> {
299        let start = std::time::Instant::now();
300        let deposit_params = request.into_inner();
301        let deposit_data: DepositData = deposit_params.try_into()?;
302        tracing::warn!(
303            "Called get_deposit_keys with deposit data: {:?}",
304            deposit_data
305        );
306        let winternitz_keys = self
307            .operator
308            .generate_assert_winternitz_pubkeys(deposit_data.get_deposit_outpoint())?;
309        let hashes = self
310            .operator
311            .generate_challenge_ack_preimages_and_hashes(&deposit_data)?;
312        tracing::info!("Generated deposit keys in {:?}", start.elapsed());
313
314        Ok(Response::new(OperatorKeys {
315            winternitz_pubkeys: winternitz_keys
316                .into_iter()
317                .map(|pubkey| pubkey.into())
318                .collect(),
319            challenge_ack_digests: hashes
320                .into_iter()
321                .map(|hash| ChallengeAckDigest { hash: hash.into() })
322                .collect(),
323        }))
324    }
325
326    async fn internal_create_signed_txs(
327        &self,
328        request: tonic::Request<super::TransactionRequest>,
329    ) -> std::result::Result<tonic::Response<super::SignedTxsWithType>, tonic::Status> {
330        let transaction_request = request.into_inner();
331        let transaction_data: TransactionRequestData = transaction_request.try_into()?;
332        tracing::warn!(
333            "Called internal_create_signed_txs with transaction request data: {:?}",
334            transaction_data
335        );
336        let (_, deposit_data) = self
337            .operator
338            .db
339            .get_deposit_data(None, transaction_data.deposit_outpoint)
340            .await?
341            .ok_or(Status::invalid_argument("Deposit not found in database"))?;
342        let context = ContractContext::new_context_for_kickoff(
343            transaction_data.kickoff_data,
344            deposit_data,
345            self.operator.config.protocol_paramset(),
346        );
347        let raw_txs = create_and_sign_txs(
348            self.operator.db.clone(),
349            &self.operator.signer,
350            self.operator.config.clone(),
351            context,
352            Some([0u8; 20]), // dummy blockhash
353            None,
354        )
355        .await?;
356
357        Ok(Response::new(SignedTxsWithType {
358            signed_txs: raw_txs
359                .into_iter()
360                .map(|(tx_type, signed_tx)| SignedTxWithType {
361                    transaction_type: Some(tx_type.into()),
362                    raw_tx: bitcoin::consensus::serialize(&signed_tx),
363                })
364                .collect(),
365        }))
366    }
367
368    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
369    async fn internal_finalized_payout(
370        &self,
371        request: Request<FinalizedPayoutParams>,
372    ) -> Result<Response<clementine::Txid>, Status> {
373        if !cfg!(test) {
374            return Err(Status::permission_denied(
375                "This method is only available in tests",
376            ));
377        }
378
379        tracing::info!(
380            "Internal finalized payout rpc called with finalized payout params: {:?}",
381            request.get_ref()
382        );
383
384        let payout_blockhash: [u8; 32] = request
385            .get_ref()
386            .payout_blockhash
387            .clone()
388            .try_into()
389            .map_err(|e| {
390                Status::invalid_argument(format!(
391                    "Failed to convert payout blockhash to [u8; 32]: {e:?}"
392                ))
393            })?;
394        let deposit_outpoint: OutPoint = request
395            .get_ref()
396            .deposit_outpoint
397            .clone()
398            .ok_or(Status::invalid_argument("Failed to get deposit outpoint"))?
399            .try_into()?;
400
401        let mut dbtx = self.operator.db.begin_transaction().await?;
402        let kickoff_txid = self
403            .operator
404            .handle_finalized_payout(
405                &mut dbtx,
406                deposit_outpoint,
407                BlockHash::from_byte_array(payout_blockhash),
408            )
409            .await?;
410        dbtx.commit()
411            .await
412            .wrap_err("Failed to commit transaction")
413            .map_to_status()?;
414
415        Ok(Response::new(kickoff_txid.into()))
416    }
417
418    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
419    async fn internal_end_round(
420        &self,
421        _request: Request<Empty>,
422    ) -> Result<Response<Empty>, Status> {
423        tracing::warn!("Internal end round rpc called");
424        #[cfg(feature = "automation")]
425        {
426            use eyre::Context;
427
428            let mut dbtx = self.operator.db.begin_transaction().await?;
429
430            self.operator.end_round(&mut dbtx).await?;
431
432            dbtx.commit()
433                .await
434                .wrap_err("Failed to commit transaction")
435                .map_to_status()?;
436            Ok(Response::new(Empty {}))
437        }
438
439        #[cfg(not(feature = "automation"))]
440        Err(Status::unimplemented(
441            "Automation is not enabled. Operator does not manage its rounds",
442        ))
443    }
444
445    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
446    async fn get_x_only_public_key(
447        &self,
448        _request: Request<Empty>,
449    ) -> Result<Response<XOnlyPublicKeyRpc>, Status> {
450        tracing::info!("Get xonly public key rpc called");
451        let xonly_pk = self.operator.signer.xonly_public_key.serialize();
452        Ok(Response::new(XOnlyPublicKeyRpc {
453            xonly_public_key: xonly_pk.to_vec(),
454        }))
455    }
456
457    async fn get_current_status(
458        &self,
459        _request: Request<Empty>,
460    ) -> Result<Response<clementine::EntityStatus>, Status> {
461        tracing::info!("Get current status rpc called");
462        let status = self.get_current_status().await?;
463        tracing::info!("Get current status rpc completed successfully");
464        Ok(Response::new(status))
465    }
466
467    async fn get_reimbursement_txs(
468        &self,
469        request: Request<clementine::Outpoint>,
470    ) -> Result<Response<SignedTxsWithType>, Status> {
471        let deposit_outpoint: OutPoint = request.into_inner().try_into()?;
472        tracing::warn!(
473            "Get reimbursement txs rpc called with deposit outpoint: {:?}",
474            deposit_outpoint
475        );
476        let txs = self
477            .operator
478            .get_reimbursement_txs(deposit_outpoint)
479            .await?;
480        Ok(Response::new(txs.into()))
481    }
482}