clementine_core/
operator.rs

1use ark_ff::PrimeField;
2use circuits_lib::common::constants::FIRST_FIVE_OUTPUTS;
3
4use crate::actor::{Actor, TweakCache, WinternitzDerivationPath};
5use crate::bitvm_client::{ClementineBitVMPublicKeys, SECP};
6use crate::builder::address::create_taproot_address;
7use crate::builder::script::SpendPath;
8use crate::builder::sighash::{create_operator_sighash_stream, PartialSignatureInfo};
9use crate::builder::transaction::deposit_signature_owner::EntityType;
10use crate::builder::transaction::input::{SpendableTxIn, UtxoVout};
11use crate::builder::transaction::output::UnspentTxOut;
12use crate::builder::transaction::sign::{create_and_sign_txs, TransactionRequestData};
13use crate::builder::transaction::{
14    create_burn_unused_kickoff_connectors_txhandler, create_round_nth_txhandler,
15    create_round_txhandlers, ContractContext, KickoffWinternitzKeys, TxHandler, TxHandlerBuilder,
16    DEFAULT_SEQUENCE,
17};
18use crate::citrea::CitreaClientT;
19use crate::config::BridgeConfig;
20use crate::database::Database;
21use crate::database::DatabaseTransaction;
22use crate::deposit::{DepositData, KickoffData, OperatorData};
23use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
24use clementine_errors::BridgeError;
25use clementine_primitives::TransactionType;
26
27use crate::metrics::SyncStatusProvider;
28use crate::rpc::clementine::{EntityStatus, NormalSignatureKind, StoppedTasks};
29use crate::task::entity_metric_publisher::{
30    EntityMetricPublisher, ENTITY_METRIC_PUBLISHER_INTERVAL,
31};
32use crate::task::manager::BackgroundTaskManager;
33use crate::task::payout_checker::{PayoutCheckerTask, PAYOUT_CHECKER_POLL_DELAY};
34use crate::task::TaskExt;
35use crate::utils::{monitor_standalone_task, Last20Bytes, ScriptBufExt};
36use crate::utils::{NamedEntity, TxMetadata};
37use crate::{builder, constants};
38use bitcoin::hashes::Hash;
39use bitcoin::secp256k1::schnorr::Signature;
40use bitcoin::secp256k1::{schnorr, Message};
41use bitcoin::{taproot, Address, Amount, BlockHash, OutPoint, ScriptBuf, Transaction, TxOut, Txid};
42use bitcoincore_rpc::json::AddressType;
43use bitcoincore_rpc::RpcApi;
44use bitvm::signatures::winternitz;
45use clementine_primitives::UTXO;
46use clementine_primitives::{PublicHash, RoundIndex};
47
48use eyre::{eyre, Context, OptionExt};
49use std::collections::HashMap;
50use tokio::sync::mpsc;
51use tokio_stream::StreamExt;
52
53#[cfg(feature = "automation")]
54use {
55    crate::{
56        builder::script::extract_winternitz_commits,
57        header_chain_prover::HeaderChainProver,
58        states::StateManager,
59        task::IntoTask,
60        tx_sender::{ActivatedWithOutpoint, ActivatedWithTxid, TxSenderClient},
61        utils::FeePayingType,
62    },
63    bitcoin::Witness,
64    bitvm::chunk::api::generate_assertions,
65    bridge_circuit_host::{
66        bridge_circuit_host::{
67            create_spv, prove_bridge_circuit, MAINNET_BRIDGE_CIRCUIT_ELF,
68            REGTEST_BRIDGE_CIRCUIT_ELF, REGTEST_BRIDGE_CIRCUIT_ELF_TEST, SIGNET_BRIDGE_CIRCUIT_ELF,
69            SIGNET_BRIDGE_CIRCUIT_ELF_TEST, TESTNET4_BRIDGE_CIRCUIT_ELF,
70            TESTNET4_BRIDGE_CIRCUIT_ELF_TEST,
71        },
72        structs::{BridgeCircuitHostParams, WatchtowerContext},
73    },
74    circuits_lib::bridge_circuit::structs::LightClientProof,
75};
76
77#[cfg(feature = "automation")]
78use crate::tx_sender_queue::TxSenderClientQueueExt;
79
80pub struct OperatorServer<C: CitreaClientT> {
81    pub operator: Operator<C>,
82    background_tasks: BackgroundTaskManager,
83}
84
85#[derive(Debug, Clone)]
86pub struct Operator<C: CitreaClientT> {
87    pub rpc: ExtendedBitcoinRpc,
88    pub db: Database,
89    pub signer: Actor,
90    pub config: BridgeConfig,
91    pub collateral_funding_outpoint: OutPoint,
92    pub(crate) reimburse_addr: Address,
93    #[cfg(feature = "automation")]
94    pub tx_sender: TxSenderClient,
95    #[cfg(feature = "automation")]
96    pub header_chain_prover: HeaderChainProver,
97    pub citrea_client: C,
98}
99
100impl<C> OperatorServer<C>
101where
102    C: CitreaClientT,
103{
104    pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
105        let operator = Operator::new(config.clone()).await?;
106        let background_tasks = BackgroundTaskManager::default();
107
108        Ok(Self {
109            operator,
110            background_tasks,
111        })
112    }
113
114    /// Starts the background tasks for the operator.
115    /// If called multiple times, it will restart only the tasks that are not already running.
116    pub async fn start_background_tasks(&self) -> Result<(), BridgeError> {
117        // initialize and run state manager
118        #[cfg(feature = "automation")]
119        {
120            let state_manager = StateManager::new(
121                self.operator.db.clone(),
122                self.operator.clone(),
123                self.operator.rpc.clone(),
124                self.operator.config.clone(),
125            )
126            .await?;
127
128            let should_run_state_mgr = {
129                #[cfg(test)]
130                {
131                    self.operator.config.test_params.should_run_state_manager
132                }
133                #[cfg(not(test))]
134                {
135                    true
136                }
137            };
138
139            if should_run_state_mgr {
140                self.background_tasks
141                    .ensure_task_looping(state_manager.block_fetcher_task().await?)
142                    .await;
143                self.background_tasks
144                    .ensure_task_looping(state_manager.into_task())
145                    .await;
146            }
147        }
148
149        // run payout checker task
150        self.background_tasks
151            .ensure_task_looping(
152                PayoutCheckerTask::new(self.operator.db.clone(), self.operator.clone())
153                    .with_delay(PAYOUT_CHECKER_POLL_DELAY),
154            )
155            .await;
156
157        self.background_tasks
158            .ensure_task_looping(
159                EntityMetricPublisher::<Operator<C>, C>::new(
160                    self.operator.db.clone(),
161                    self.operator.rpc.clone(),
162                    self.operator.config.clone(),
163                    self.operator.citrea_client.clone(),
164                )
165                .with_delay(ENTITY_METRIC_PUBLISHER_INTERVAL),
166            )
167            .await;
168
169        tracing::info!("Payout checker task started");
170
171        // track the operator's round state
172        #[cfg(feature = "automation")]
173        {
174            // Will not start a new state machine if one for the operator already exists.
175            self.operator.track_rounds().await?;
176            tracing::info!("Operator round state tracked");
177        }
178
179        Ok(())
180    }
181
182    pub async fn get_current_status(&self) -> Result<EntityStatus, BridgeError> {
183        let stopped_tasks = match self.background_tasks.get_stopped_tasks().await {
184            Ok(stopped_tasks) => stopped_tasks,
185            Err(e) => {
186                tracing::error!("Failed to get stopped tasks: {:?}", e);
187                StoppedTasks {
188                    stopped_tasks: vec![format!("Stopped tasks fetch failed {:?}", e)],
189                }
190            }
191        };
192
193        // Determine if automation is enabled
194        let automation_enabled = cfg!(feature = "automation");
195
196        let sync_status = Operator::<C>::get_sync_status(
197            &self.operator.db,
198            &self.operator.rpc,
199            &self.operator.config,
200            &self.operator.citrea_client,
201        )
202        .await?;
203
204        Ok(EntityStatus {
205            automation: automation_enabled,
206            wallet_balance: sync_status
207                .wallet_balance
208                .map(|balance| format!("{} BTC", balance.to_btc())),
209            tx_sender_synced_height: sync_status.tx_sender_synced_height,
210            finalized_synced_height: sync_status.finalized_synced_height,
211            hcp_last_proven_height: sync_status.hcp_last_proven_height,
212            rpc_tip_height: sync_status.rpc_tip_height,
213            bitcoin_syncer_synced_height: sync_status.btc_syncer_synced_height,
214            stopped_tasks: Some(stopped_tasks),
215            state_manager_next_height: sync_status.state_manager_next_height,
216            btc_fee_rate_sat_vb: sync_status.bitcoin_fee_rate_sat_vb,
217            lcp_synced_height: sync_status.lcp_synced_height,
218            citrea_l2_block_height: sync_status.citrea_l2_block_height,
219        })
220    }
221}
222
223impl<C> Operator<C>
224where
225    C: CitreaClientT,
226{
227    /// Creates a new `Operator`.
228    pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
229        let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
230
231        let db = Database::new(&config).await?;
232        let rpc = ExtendedBitcoinRpc::connect(
233            config.bitcoin_rpc_url.clone(),
234            config.bitcoin_rpc_user.clone(),
235            config.bitcoin_rpc_password.clone(),
236            None,
237        )
238        .await?;
239
240        #[cfg(feature = "automation")]
241        let tx_sender =
242            TxSenderClient::new(clementine_tx_sender::TxSenderDb::from_pool(db.get_pool()));
243
244        if config.operator_withdrawal_fee_sats.is_none() {
245            return Err(eyre::eyre!("Operator withdrawal fee is not set").into());
246        }
247
248        // check if we store our collateral outpoint already in db
249        let mut dbtx = db.begin_transaction().await?;
250        let op_data = db
251            .get_operator(Some(&mut dbtx), signer.xonly_public_key)
252            .await?;
253        let (collateral_funding_outpoint, reimburse_addr) = match op_data {
254            Some(operator_data) => {
255                // Operator data is already set in db, we don't actually need to do anything.
256                // set_operator_checked will give error if the values set in config and db doesn't match.
257                (
258                    operator_data.collateral_funding_outpoint,
259                    operator_data.reimburse_addr,
260                )
261            }
262            None => {
263                if config.protocol_paramset().network == bitcoin::Network::Bitcoin
264                    && (config.operator_reimbursement_address.is_none()
265                        || config.operator_collateral_funding_outpoint.is_none())
266                {
267                    let wallet_address = rpc.get_new_wallet_address().await?;
268                    return Err(eyre::eyre!(
269                        "Operator collateral funding outpoint or reimbursement address is not set in the configuration. \
270                         To initialize the operator, please send {} BTC to the operator's address {}. \
271                         After funding, set OPERATOR_COLLATERAL_FUNDING_OUTPOINT in your configuration to the outpoint of the funded transaction, \
272                         and set OPERATOR_REIMBURSEMENT_ADDRESS to {}. \
273                         Use your operator's Bitcoin wallet address {} to fund the operator to pay withdrawals.",
274                        config.protocol_paramset().collateral_funding_amount.to_btc(),
275                        signer.address,
276                        signer.address,
277                        wallet_address
278                    ).into());
279                }
280                // Operator data is not set in db, then we check if any collateral outpoint and reimbursement address is set in config.
281                // If so we create a new operator using those data, otherwise we generate new collateral outpoint and reimbursement address.
282                let reimburse_addr = match &config.operator_reimbursement_address {
283                    Some(reimburse_addr) => {
284                        reimburse_addr
285                            .to_owned()
286                            .require_network(config.protocol_paramset().network)
287                            .wrap_err(format!("Invalid operator reimbursement address provided in config: {:?} for network: {:?}", reimburse_addr, config.protocol_paramset().network))?
288                    }
289                    None => {
290                        rpc
291                        .get_new_address(Some("OperatorReimbursement"), Some(AddressType::Bech32m))
292                        .await
293                        .wrap_err("Failed to get new address")?
294                        .require_network(config.protocol_paramset().network)
295                        .wrap_err(format!("Invalid operator reimbursement address generated for the network in config: {:?}
296                                Possibly the provided rpc's network and network given in config doesn't match", config.protocol_paramset().network))?
297                    }
298                };
299                let outpoint = match &config.operator_collateral_funding_outpoint {
300                    Some(outpoint) => {
301                        // check if outpoint exists on chain and has exactly collateral funding amount
302                        let collateral_tx = rpc
303                            .get_tx_of_txid(&outpoint.txid)
304                            .await
305                            .wrap_err("Failed to get collateral funding tx")?;
306                        let collateral_txout = collateral_tx
307                            .output
308                            .get(outpoint.vout as usize)
309                            .ok_or_eyre("Invalid vout index for collateral funding tx")?;
310                        if collateral_txout.script_pubkey != signer.address.script_pubkey() {
311                            return Err(eyre::eyre!("Operator collateral funding outpoint given in config has a different script pubkey than the pubkey matching to the operator's secret key. Script pubkey should correspond to taproot address with no scripts and internal key equal to the operator's xonly public key. Script pubkey in given outpoint: {:?}, Script pubkey should be: {:?}", collateral_txout.script_pubkey, signer.address.script_pubkey()).into());
312                        }
313                        if collateral_txout.value
314                            != config.protocol_paramset().collateral_funding_amount
315                        {
316                            return Err(eyre::eyre!("Operator collateral funding outpoint given in config has a different amount than the one specified in config..
317                                Bridge collateral funding amount: {:?}, Amount in given outpoint: {:?}", config.protocol_paramset().collateral_funding_amount, collateral_txout.value).into());
318                        }
319                        *outpoint
320                    }
321                    None => {
322                        // create a new outpoint that has collateral funding amount
323                        rpc.send_to_address(
324                            &signer.address,
325                            config.protocol_paramset().collateral_funding_amount,
326                        )
327                        .await?
328                    }
329                };
330                (outpoint, reimburse_addr)
331            }
332        };
333
334        db.insert_operator_if_not_exists(
335            Some(&mut dbtx),
336            signer.xonly_public_key,
337            &reimburse_addr,
338            collateral_funding_outpoint,
339        )
340        .await?;
341        dbtx.commit().await?;
342        let citrea_client = C::new(
343            config.citrea_rpc_url.clone(),
344            config.citrea_light_client_prover_url.clone(),
345            config.citrea_chain_id,
346            None,
347            config.citrea_request_timeout,
348        )
349        .await?;
350
351        tracing::info!(
352            "Operator xonly pk: {:?}, db created with name: {:?}",
353            signer.xonly_public_key,
354            config.db_name
355        );
356
357        #[cfg(feature = "automation")]
358        let header_chain_prover = HeaderChainProver::new(&config, rpc.clone()).await?;
359
360        Ok(Operator {
361            rpc,
362            db: db.clone(),
363            signer,
364            config,
365            collateral_funding_outpoint,
366            #[cfg(feature = "automation")]
367            tx_sender,
368            citrea_client,
369            #[cfg(feature = "automation")]
370            header_chain_prover,
371            reimburse_addr,
372        })
373    }
374
375    /// Returns an operator's winternitz public keys and challenge ackpreimages
376    /// & hashes.
377    ///
378    /// # Returns
379    ///
380    /// - [`mpsc::Receiver`]: A [`tokio`] data channel with a type of
381    ///   [`winternitz::PublicKey`] and size of operator's winternitz public
382    ///   keys count
383    /// - [`mpsc::Receiver`]: A [`tokio`] data channel with a type of
384    ///   [`PublicHash`] and size of operator's challenge ack preimages & hashes
385    ///   count
386    ///
387    pub async fn get_params(
388        &self,
389    ) -> Result<
390        (
391            mpsc::Receiver<winternitz::PublicKey>,
392            mpsc::Receiver<schnorr::Signature>,
393        ),
394        BridgeError,
395    > {
396        tracing::info!("Generating operator params");
397        tracing::info!("Generating kickoff winternitz pubkeys");
398        let wpks = self.generate_kickoff_winternitz_pubkeys()?;
399        tracing::info!("Kickoff winternitz pubkeys generated");
400        let (wpk_tx, wpk_rx) = mpsc::channel(wpks.len());
401        let kickoff_wpks = KickoffWinternitzKeys::new(
402            wpks,
403            self.config.protocol_paramset().num_kickoffs_per_round,
404            self.config.protocol_paramset().num_round_txs,
405        )?;
406        tracing::info!("Starting to generate unspent kickoff signatures");
407        let kickoff_sigs = self.generate_unspent_kickoff_sigs(&kickoff_wpks)?;
408        tracing::info!("Unspent kickoff signatures generated");
409        let wpks = kickoff_wpks.get_all_keys();
410        let (sig_tx, sig_rx) = mpsc::channel(kickoff_sigs.len());
411
412        tokio::spawn(async move {
413            for wpk in wpks {
414                wpk_tx
415                    .send(wpk)
416                    .await
417                    .wrap_err("Failed to send winternitz public key")?;
418            }
419
420            for sig in kickoff_sigs {
421                sig_tx
422                    .send(sig)
423                    .await
424                    .wrap_err("Failed to send kickoff signature")?;
425            }
426
427            Ok::<(), BridgeError>(())
428        });
429
430        Ok((wpk_rx, sig_rx))
431    }
432
433    pub async fn deposit_sign(
434        &self,
435        mut deposit_data: DepositData,
436    ) -> Result<mpsc::Receiver<Result<schnorr::Signature, BridgeError>>, BridgeError> {
437        self.citrea_client
438            .check_nofn_correctness(deposit_data.get_nofn_xonly_pk()?)
439            .await?;
440
441        let mut tweak_cache = TweakCache::default();
442        let (sig_tx, sig_rx) = mpsc::channel(constants::DEFAULT_CHANNEL_SIZE);
443        let monitor_err_sender = sig_tx.clone();
444
445        let deposit_blockhash = self
446            .rpc
447            .get_blockhash_of_tx(&deposit_data.get_deposit_outpoint().txid)
448            .await?;
449
450        let mut sighash_stream = Box::pin(create_operator_sighash_stream(
451            self.db.clone(),
452            self.signer.xonly_public_key,
453            self.config.clone(),
454            deposit_data,
455            deposit_blockhash,
456        ));
457
458        let signer = self.signer.clone();
459        let handle = tokio::spawn(async move {
460            while let Some(sighash) = sighash_stream.next().await {
461                // None because utxos that operators need to sign do not have scripts
462                let (sighash, sig_info) = sighash?;
463                let sig = signer.sign_with_tweak_data(
464                    sighash,
465                    sig_info.tweak_data,
466                    Some(&mut tweak_cache),
467                )?;
468
469                sig_tx
470                    .send(Ok(sig))
471                    .await
472                    .wrap_err("Failed to send signature in operator deposit sign")?;
473            }
474
475            Ok::<(), BridgeError>(())
476        });
477        monitor_standalone_task(handle, "Operator deposit sign", monitor_err_sender);
478
479        Ok(sig_rx)
480    }
481
482    /// Creates the round state machine by adding a system event to the database
483    #[cfg(feature = "automation")]
484    pub async fn track_rounds(&self) -> Result<(), BridgeError> {
485        let mut dbtx = self.db.begin_transaction().await?;
486        // set operators own kickoff winternitz public keys before creating the round state machine
487        // as round machine needs kickoff keys to create the first round tx
488        self.db
489            .insert_operator_kickoff_winternitz_public_keys_if_not_exist(
490                Some(&mut dbtx),
491                self.signer.xonly_public_key,
492                self.generate_kickoff_winternitz_pubkeys()?,
493            )
494            .await?;
495
496        StateManager::<Operator<C>>::dispatch_new_round_machine(&self.db, &mut dbtx, self.data())
497            .await?;
498        dbtx.commit().await?;
499        Ok(())
500    }
501
502    /// Checks if the withdrawal amount is within the acceptable range.
503    fn is_profitable(
504        input_amount: Amount,
505        withdrawal_amount: Amount,
506        bridge_amount_sats: Amount,
507        operator_withdrawal_fee_sats: Amount,
508    ) -> bool {
509        // Use checked_sub to safely handle potential underflow
510        let withdrawal_diff = match withdrawal_amount
511            .to_sat()
512            .checked_sub(input_amount.to_sat())
513        {
514            Some(diff) => Amount::from_sat(diff),
515            None => {
516                // input amount is greater than withdrawal amount, so it's profitable but doesn't make sense
517                tracing::warn!(
518                    "Some user gave more amount than the withdrawal amount as input for withdrawal"
519                );
520                return true;
521            }
522        };
523
524        if withdrawal_diff > bridge_amount_sats {
525            return false;
526        }
527
528        // Calculate net profit after the withdrawal using checked_sub to prevent panic
529        let net_profit = match bridge_amount_sats.checked_sub(withdrawal_diff) {
530            Some(profit) => profit,
531            None => return false, // If underflow occurs, it's not profitable
532        };
533
534        // Net profit must be bigger than withdrawal fee.
535        // net profit doesn't take into account the fees, but operator_withdrawal_fee_sats should
536        net_profit >= operator_withdrawal_fee_sats
537    }
538
539    /// Prepares a withdrawal by:
540    ///
541    /// 1. Checking if the withdrawal has been made on Citrea
542    /// 2. Verifying the given signature
543    /// 3. Checking if the withdrawal is profitable or not
544    /// 4. Funding the withdrawal transaction using TxSender RBF option
545    ///
546    /// # Parameters
547    ///
548    /// - `withdrawal_idx`: Citrea withdrawal UTXO index
549    /// - `in_signature`: User's signature that is going to be used for signing
550    ///   withdrawal transaction input
551    /// - `in_outpoint`: User's input for the payout transaction
552    /// - `out_script_pubkey`: User's script pubkey which will be used
553    ///   in the payout transaction's output
554    /// - `out_amount`: Payout transaction output's value
555    ///
556    /// # Returns
557    ///
558    /// - Ok(()) if the withdrawal checks are successful and a payout transaction is added to the TxSender
559    /// - Err(BridgeError) if the withdrawal checks fail
560    pub async fn withdraw(
561        &self,
562        withdrawal_index: u32,
563        in_signature: taproot::Signature,
564        in_outpoint: OutPoint,
565        out_script_pubkey: ScriptBuf,
566        out_amount: Amount,
567    ) -> Result<Transaction, BridgeError> {
568        tracing::info!(
569            "Withdrawing with index: {}, in_signature: {:?}, in_outpoint: {:?}, out_script_pubkey: {}, out_amount: {}",
570            withdrawal_index,
571            in_signature,
572            in_outpoint,
573            out_script_pubkey,
574            out_amount
575        );
576
577        // Prepare input and output of the payout transaction.
578        let input_prevout = self.rpc.get_txout_from_outpoint(&in_outpoint).await?;
579        let input_utxo = UTXO {
580            outpoint: in_outpoint,
581            txout: input_prevout,
582        };
583        let output_txout = TxOut {
584            value: out_amount,
585            script_pubkey: out_script_pubkey,
586        };
587
588        // Check Citrea for the withdrawal state.
589        let withdrawal_utxo = self
590            .db
591            .get_withdrawal_utxo_from_citrea_withdrawal(None, withdrawal_index)
592            .await?;
593
594        if withdrawal_utxo != input_utxo.outpoint {
595            return Err(eyre::eyre!("Input UTXO does not match withdrawal UTXO from Citrea: Input Outpoint: {0}, Withdrawal Outpoint (from Citrea): {1}", input_utxo.outpoint, withdrawal_utxo).into());
596        }
597
598        let operator_withdrawal_fee_sats =
599            self.config
600                .operator_withdrawal_fee_sats
601                .ok_or(BridgeError::ConfigError(
602                    "Operator withdrawal fee sats is not specified in configuration file"
603                        .to_string(),
604                ))?;
605        if !Self::is_profitable(
606            input_utxo.txout.value,
607            output_txout.value,
608            self.config.protocol_paramset().bridge_amount,
609            operator_withdrawal_fee_sats,
610        ) {
611            return Err(eyre::eyre!("Not enough fee for operator").into());
612        }
613
614        let user_xonly_pk = &input_utxo
615            .txout
616            .script_pubkey
617            .try_get_taproot_pk()
618            .wrap_err("Input utxo script pubkey is not a valid taproot script")?;
619
620        let payout_txhandler = builder::transaction::create_payout_txhandler(
621            input_utxo,
622            output_txout,
623            self.signer.xonly_public_key,
624            in_signature,
625            self.config.protocol_paramset().network,
626        )?;
627
628        // tracing::info!("Payout txhandler: {:?}", hex::encode(bitcoin::consensus::serialize(&payout_txhandler.get_cached_tx())));
629
630        let sighash = payout_txhandler.calculate_sighash_txin(0, in_signature.sighash_type)?;
631
632        SECP.verify_schnorr(
633            &in_signature.signature,
634            &Message::from_digest(*sighash.as_byte_array()),
635            user_xonly_pk,
636        )
637        .wrap_err("Failed to verify signature received from user for payout txin. Ensure the signature uses SinglePlusAnyoneCanPay sighash type.")?;
638
639        let fee_rate = self
640            .rpc
641            .get_fee_rate_kvb(
642                self.config.protocol_paramset.network,
643                &self.config.mempool_api_host,
644                &self.config.mempool_api_endpoint,
645                self.config.tx_sender_limits.mempool_fee_rate_multiplier,
646                self.config.tx_sender_limits.mempool_fee_rate_offset_sat_kvb,
647                self.config.tx_sender_limits.fee_rate_hard_cap,
648            )
649            .await?;
650
651        // send payout tx using RBF
652        let funded_tx = self
653            .rpc
654            .fund_raw_transaction(
655                payout_txhandler.get_cached_tx(),
656                Some(&bitcoincore_rpc::json::FundRawTransactionOptions {
657                    add_inputs: Some(true),
658                    include_unsafe: Some(false),
659                    change_address: None,
660                    change_position: Some(1),
661                    change_type: None,
662                    include_watching: None,
663                    lock_unspents: Some(false),
664                    fee_rate: Some(Amount::from_sat(fee_rate.to_sat_per_kvb())),
665                    subtract_fee_from_outputs: None,
666                    replaceable: None,
667                    conf_target: None,
668                    estimate_mode: None,
669                }),
670                None,
671            )
672            .await
673            .wrap_err("Failed to fund raw transaction")?
674            .hex;
675
676        let signed_tx = self
677            .rpc
678            .sign_raw_transaction_with_wallet(&funded_tx, None, None)
679            .await
680            .wrap_err("Failed to sign withdrawal transaction")?
681            .hex;
682
683        let signed_tx: Transaction = bitcoin::consensus::deserialize(&signed_tx)
684            .wrap_err("Failed to deserialize signed withdrawal transaction")?;
685
686        self.rpc
687            .send_raw_transaction(&signed_tx)
688            .await
689            .wrap_err("Failed to send withdrawal transaction")?;
690
691        Ok(signed_tx)
692    }
693
694    /// Generates Winternitz public keys for every  BitVM assert tx for a deposit.
695    ///
696    /// # Returns
697    ///
698    /// - [`Vec<Vec<winternitz::PublicKey>>`]: Winternitz public keys for
699    ///   `watchtower index` row and `BitVM assert tx index` column.
700    pub fn generate_assert_winternitz_pubkeys(
701        &self,
702        deposit_outpoint: bitcoin::OutPoint,
703    ) -> Result<Vec<winternitz::PublicKey>, BridgeError> {
704        tracing::debug!("Generating assert winternitz pubkeys");
705        let bitvm_pks = self
706            .signer
707            .generate_bitvm_pks_for_deposit(deposit_outpoint, self.config.protocol_paramset())?;
708
709        let flattened_wpks = bitvm_pks.to_flattened_vec();
710
711        Ok(flattened_wpks)
712    }
713    /// Generates Winternitz public keys for every blockhash commit to be used in kickoff utxos.
714    /// Unique for each kickoff utxo of operator.
715    ///
716    /// # Returns
717    ///
718    /// - [`Vec<Vec<winternitz::PublicKey>>`]: Winternitz public keys for
719    ///   `round_index` row and `kickoff_idx` column.
720    pub fn generate_kickoff_winternitz_pubkeys(
721        &self,
722    ) -> Result<Vec<winternitz::PublicKey>, BridgeError> {
723        let mut winternitz_pubkeys =
724            Vec::with_capacity(self.config.get_num_kickoff_winternitz_pks());
725
726        // we need num_round_txs + 1 because the last round includes reimburse generators of previous round
727        for round_idx in RoundIndex::iter_rounds(self.config.protocol_paramset().num_round_txs + 1)
728        {
729            for kickoff_idx in 0..self.config.protocol_paramset().num_kickoffs_per_round {
730                let path = WinternitzDerivationPath::Kickoff(
731                    round_idx,
732                    kickoff_idx as u32,
733                    self.config.protocol_paramset(),
734                );
735                winternitz_pubkeys.push(self.signer.derive_winternitz_pk(path)?);
736            }
737        }
738
739        if winternitz_pubkeys.len() != self.config.get_num_kickoff_winternitz_pks() {
740            return Err(eyre::eyre!(
741                "Expected {} number of kickoff winternitz pubkeys, but got {}",
742                self.config.get_num_kickoff_winternitz_pks(),
743                winternitz_pubkeys.len()
744            )
745            .into());
746        }
747
748        Ok(winternitz_pubkeys)
749    }
750
751    pub fn generate_unspent_kickoff_sigs(
752        &self,
753        kickoff_wpks: &KickoffWinternitzKeys,
754    ) -> Result<Vec<Signature>, BridgeError> {
755        let mut tweak_cache = TweakCache::default();
756        let mut sigs: Vec<Signature> =
757            Vec::with_capacity(self.config.get_num_unspent_kickoff_sigs());
758        let mut prev_ready_to_reimburse: Option<TxHandler> = None;
759        let operator_data = OperatorData {
760            xonly_pk: self.signer.xonly_public_key,
761            collateral_funding_outpoint: self.collateral_funding_outpoint,
762            reimburse_addr: self.reimburse_addr.clone(),
763        };
764        for round_idx in RoundIndex::iter_rounds(self.config.protocol_paramset().num_round_txs) {
765            let txhandlers = create_round_txhandlers(
766                self.config.protocol_paramset(),
767                round_idx,
768                &operator_data,
769                kickoff_wpks,
770                prev_ready_to_reimburse.as_ref(),
771            )?;
772            for txhandler in txhandlers {
773                if let TransactionType::UnspentKickoff(kickoff_idx) =
774                    txhandler.get_transaction_type()
775                {
776                    let partial = PartialSignatureInfo {
777                        operator_idx: 0, // dummy value
778                        round_idx,
779                        kickoff_utxo_idx: kickoff_idx,
780                    };
781                    let sighashes = txhandler
782                        .calculate_shared_txins_sighash(EntityType::OperatorSetup, partial)?;
783                    let signed_sigs: Result<Vec<_>, _> = sighashes
784                        .into_iter()
785                        .map(|(sighash, sig_info)| {
786                            self.signer.sign_with_tweak_data(
787                                sighash,
788                                sig_info.tweak_data,
789                                Some(&mut tweak_cache),
790                            )
791                        })
792                        .collect();
793                    sigs.extend(signed_sigs?);
794                }
795                if let TransactionType::ReadyToReimburse = txhandler.get_transaction_type() {
796                    prev_ready_to_reimburse = Some(txhandler);
797                }
798            }
799        }
800        if sigs.len() != self.config.get_num_unspent_kickoff_sigs() {
801            return Err(eyre::eyre!(
802                "Expected {} number of unspent kickoff sigs, but got {}",
803                self.config.get_num_unspent_kickoff_sigs(),
804                sigs.len()
805            )
806            .into());
807        }
808        Ok(sigs)
809    }
810
811    pub fn generate_challenge_ack_preimages_and_hashes(
812        &self,
813        deposit_data: &DepositData,
814    ) -> Result<Vec<PublicHash>, BridgeError> {
815        let mut hashes = Vec::with_capacity(self.config.get_num_challenge_ack_hashes(deposit_data));
816
817        for watchtower_idx in 0..deposit_data.get_num_watchtowers() {
818            let path = WinternitzDerivationPath::ChallengeAckHash(
819                watchtower_idx as u32,
820                deposit_data.get_deposit_outpoint(),
821                self.config.protocol_paramset(),
822            );
823            let hash = self.signer.generate_public_hash_from_path(path)?;
824            hashes.push(hash);
825        }
826
827        if hashes.len() != self.config.get_num_challenge_ack_hashes(deposit_data) {
828            return Err(eyre::eyre!(
829                "Expected {} number of challenge ack hashes, but got {}",
830                self.config.get_num_challenge_ack_hashes(deposit_data),
831                hashes.len()
832            )
833            .into());
834        }
835
836        Ok(hashes)
837    }
838
839    pub async fn handle_finalized_payout<'a>(
840        &'a self,
841        dbtx: DatabaseTransaction<'a>,
842        deposit_outpoint: OutPoint,
843        payout_tx_blockhash: BlockHash,
844    ) -> Result<bitcoin::Txid, BridgeError> {
845        let (deposit_id, deposit_data) = self
846            .db
847            .get_deposit_data(Some(dbtx), deposit_outpoint)
848            .await?
849            .ok_or(BridgeError::DatabaseError(sqlx::Error::RowNotFound))?;
850
851        // get unused kickoff connector
852        let (round_idx, kickoff_idx) = self
853            .db
854            .get_unused_and_signed_kickoff_connector(
855                Some(dbtx),
856                deposit_id,
857                self.signer.xonly_public_key,
858            )
859            .await?
860            .ok_or(BridgeError::DatabaseError(sqlx::Error::RowNotFound))?;
861
862        let current_round_index = self.db.get_current_round_index(Some(dbtx)).await?;
863        tracing::info!(
864            "Operator: Current round index: {}, round idx for kickoff: {}",
865            current_round_index,
866            round_idx
867        );
868        #[cfg(feature = "automation")]
869        if current_round_index != round_idx {
870            // we currently have no free kickoff connectors in the current round, so we need to end round first
871            // if current_round_index should only be smaller than round_idx, and should not be smaller by more than 1
872            // so sanity check:
873            if current_round_index.next_round() != round_idx {
874                return Err(eyre::eyre!(
875                    "Internal error: Expected the current round ({:?}) to be equal to or 1 less than the round of the first available kickoff for deposit reimbursement ({:?}) for deposit {:?}. If the round is less than the current round, there is an issue with the logic of the fn that gets the first available kickoff. If the round is greater, that means the next round do not have any kickoff connectors available for reimbursement, which should not be possible.",
876                    current_round_index, round_idx, deposit_outpoint
877                ).into());
878            }
879            tracing::info!(
880                "Operator: Starting next round to be able to get reimbursement for the payout"
881            );
882            // start the next round to be able to get reimbursement for the payout
883            self.end_round(dbtx).await?;
884        }
885
886        // get signed txs,
887        let kickoff_data = KickoffData {
888            operator_xonly_pk: self.signer.xonly_public_key,
889            round_idx,
890            kickoff_idx,
891        };
892
893        let payout_tx_blockhash = payout_tx_blockhash.as_byte_array().last_20_bytes();
894
895        #[cfg(test)]
896        let payout_tx_blockhash = self
897            .config
898            .test_params
899            .maybe_disrupt_payout_tx_block_hash_commit(payout_tx_blockhash);
900
901        let context = ContractContext::new_context_for_kickoff(
902            kickoff_data,
903            deposit_data,
904            self.config.protocol_paramset(),
905        );
906
907        let signed_txs = create_and_sign_txs(
908            self.db.clone(),
909            &self.signer,
910            self.config.clone(),
911            context,
912            Some(payout_tx_blockhash),
913            Some(dbtx),
914        )
915        .await?;
916
917        let tx_metadata = Some(TxMetadata {
918            tx_type: TransactionType::Dummy, // will be replaced in add_tx_to_queue
919            operator_xonly_pk: Some(self.signer.xonly_public_key),
920            round_idx: Some(round_idx),
921            kickoff_idx: Some(kickoff_idx),
922            deposit_outpoint: Some(deposit_outpoint),
923        });
924
925        // try to send them
926        for (tx_type, signed_tx) in &signed_txs {
927            match *tx_type {
928                TransactionType::Kickoff
929                | TransactionType::OperatorChallengeAck(_)
930                | TransactionType::WatchtowerChallengeTimeout(_)
931                | TransactionType::ChallengeTimeout
932                | TransactionType::DisproveTimeout
933                | TransactionType::Reimburse => {
934                    #[cfg(feature = "automation")]
935                    self.tx_sender
936                        .add_tx_to_queue(
937                            dbtx,
938                            *tx_type,
939                            signed_tx,
940                            &signed_txs,
941                            tx_metadata,
942                            self.config.protocol_paramset(),
943                            None,
944                        )
945                        .await?;
946                }
947                _ => {}
948            }
949        }
950
951        let kickoff_txid = signed_txs
952            .iter()
953            .find_map(|(tx_type, tx)| {
954                if let TransactionType::Kickoff = tx_type {
955                    Some(tx.compute_txid())
956                } else {
957                    None
958                }
959            })
960            .ok_or(eyre::eyre!(
961                "Couldn't find kickoff tx in signed_txs".to_string(),
962            ))?;
963
964        // mark the kickoff connector as used
965        self.db
966            .mark_kickoff_connector_as_used(Some(dbtx), round_idx, kickoff_idx, Some(kickoff_txid))
967            .await?;
968
969        Ok(kickoff_txid)
970    }
971
972    #[cfg(feature = "automation")]
973    async fn start_first_round(
974        &self,
975        dbtx: DatabaseTransaction<'_>,
976        kickoff_wpks: KickoffWinternitzKeys,
977    ) -> Result<(), BridgeError> {
978        // try to send the first round tx
979        let (mut first_round_tx, _) = create_round_nth_txhandler(
980            self.signer.xonly_public_key,
981            self.collateral_funding_outpoint,
982            self.config.protocol_paramset().collateral_funding_amount,
983            RoundIndex::Round(0),
984            &kickoff_wpks,
985            self.config.protocol_paramset(),
986        )?;
987
988        self.signer
989            .tx_sign_and_fill_sigs(&mut first_round_tx, &[], None)
990            .wrap_err("Failed to sign first round tx")?;
991
992        self.tx_sender
993            .insert_try_to_send(
994                dbtx,
995                Some(TxMetadata {
996                    tx_type: TransactionType::Round,
997                    operator_xonly_pk: None,
998                    round_idx: Some(RoundIndex::Round(0)),
999                    kickoff_idx: None,
1000                    deposit_outpoint: None,
1001                }),
1002                first_round_tx.get_cached_tx(),
1003                FeePayingType::CPFP,
1004                None,
1005                &[],
1006                &[],
1007                &[],
1008                &[],
1009            )
1010            .await?;
1011
1012        // update current round index to 1
1013        self.db
1014            .update_current_round_index(Some(dbtx), RoundIndex::Round(0))
1015            .await?;
1016
1017        Ok(())
1018    }
1019
1020    #[cfg(feature = "automation")]
1021    pub async fn end_round<'a>(
1022        &'a self,
1023        mut dbtx: DatabaseTransaction<'a>,
1024    ) -> Result<(), BridgeError> {
1025        // get current round index
1026        let current_round_index = self.db.get_current_round_index(Some(&mut dbtx)).await?;
1027
1028        let mut activation_prerequisites = Vec::new();
1029
1030        let operator_winternitz_public_keys = self
1031            .db
1032            .get_operator_kickoff_winternitz_public_keys(None, self.signer.xonly_public_key)
1033            .await?;
1034        let kickoff_wpks = KickoffWinternitzKeys::new(
1035            operator_winternitz_public_keys,
1036            self.config.protocol_paramset().num_kickoffs_per_round,
1037            self.config.protocol_paramset().num_round_txs,
1038        )?;
1039
1040        // if we are at round 0, which is just the collateral, we need to start the first round
1041        if current_round_index == RoundIndex::Collateral {
1042            return self.start_first_round(dbtx, kickoff_wpks).await;
1043        }
1044
1045        let (current_round_txhandler, mut ready_to_reimburse_txhandler) =
1046            create_round_nth_txhandler(
1047                self.signer.xonly_public_key,
1048                self.collateral_funding_outpoint,
1049                self.config.protocol_paramset().collateral_funding_amount,
1050                current_round_index,
1051                &kickoff_wpks,
1052                self.config.protocol_paramset(),
1053            )?;
1054
1055        let (mut next_round_txhandler, _) = create_round_nth_txhandler(
1056            self.signer.xonly_public_key,
1057            self.collateral_funding_outpoint,
1058            self.config.protocol_paramset().collateral_funding_amount,
1059            current_round_index.next_round(),
1060            &kickoff_wpks,
1061            self.config.protocol_paramset(),
1062        )?;
1063
1064        let mut tweak_cache = TweakCache::default();
1065
1066        // sign ready to reimburse tx
1067        self.signer
1068            .tx_sign_and_fill_sigs(
1069                &mut ready_to_reimburse_txhandler,
1070                &[],
1071                Some(&mut tweak_cache),
1072            )
1073            .wrap_err("Failed to sign ready to reimburse tx")?;
1074
1075        // sign next round tx
1076        self.signer
1077            .tx_sign_and_fill_sigs(&mut next_round_txhandler, &[], Some(&mut tweak_cache))
1078            .wrap_err("Failed to sign next round tx")?;
1079
1080        let current_round_txid = current_round_txhandler.get_cached_tx().compute_txid();
1081        let ready_to_reimburse_tx = ready_to_reimburse_txhandler.get_cached_tx();
1082        let next_round_tx = next_round_txhandler.get_cached_tx();
1083
1084        let ready_to_reimburse_txid = ready_to_reimburse_tx.compute_txid();
1085
1086        let mut unspent_kickoff_connector_indices = Vec::new();
1087
1088        // get kickoff txid for used kickoff connector
1089        for kickoff_connector_idx in
1090            0..self.config.protocol_paramset().num_kickoffs_per_round as u32
1091        {
1092            let kickoff_txid = self
1093                .db
1094                .get_kickoff_txid_for_used_kickoff_connector(
1095                    Some(&mut dbtx),
1096                    current_round_index,
1097                    kickoff_connector_idx,
1098                )
1099                .await?;
1100            match kickoff_txid {
1101                Some(kickoff_txid) => {
1102                    activation_prerequisites.push(ActivatedWithOutpoint {
1103                        outpoint: OutPoint {
1104                            txid: kickoff_txid,
1105                            vout: UtxoVout::KickoffFinalizer.get_vout(), // Kickoff finalizer output index
1106                        },
1107                        relative_block_height: self.config.protocol_paramset().finality_depth - 1,
1108                    });
1109                }
1110                None => {
1111                    let unspent_kickoff_connector = OutPoint {
1112                        txid: current_round_txid,
1113                        vout: UtxoVout::Kickoff(kickoff_connector_idx as usize).get_vout(),
1114                    };
1115                    unspent_kickoff_connector_indices.push(kickoff_connector_idx as usize);
1116                    self.db
1117                        .mark_kickoff_connector_as_used(
1118                            Some(&mut dbtx),
1119                            current_round_index,
1120                            kickoff_connector_idx,
1121                            None,
1122                        )
1123                        .await?;
1124                    activation_prerequisites.push(ActivatedWithOutpoint {
1125                        outpoint: unspent_kickoff_connector,
1126                        relative_block_height: self.config.protocol_paramset().finality_depth - 1,
1127                    });
1128                }
1129            }
1130        }
1131
1132        // Burn unused kickoff connectors
1133        let mut burn_unspent_kickoff_connectors_tx =
1134            create_burn_unused_kickoff_connectors_txhandler(
1135                &current_round_txhandler,
1136                &unspent_kickoff_connector_indices,
1137                &self.signer.address,
1138                self.config.protocol_paramset(),
1139            )?;
1140
1141        // sign burn unused kickoff connectors tx
1142        self.signer
1143            .tx_sign_and_fill_sigs(
1144                &mut burn_unspent_kickoff_connectors_tx,
1145                &[],
1146                Some(&mut tweak_cache),
1147            )
1148            .wrap_err("Failed to sign burn unused kickoff connectors tx")?;
1149
1150        self.tx_sender
1151            .insert_try_to_send(
1152                dbtx,
1153                Some(TxMetadata {
1154                    tx_type: TransactionType::BurnUnusedKickoffConnectors,
1155                    operator_xonly_pk: Some(self.signer.xonly_public_key),
1156                    round_idx: Some(current_round_index),
1157                    kickoff_idx: None,
1158                    deposit_outpoint: None,
1159                }),
1160                burn_unspent_kickoff_connectors_tx.get_cached_tx(),
1161                FeePayingType::CPFP,
1162                None,
1163                &[],
1164                &[],
1165                &[],
1166                &[],
1167            )
1168            .await?;
1169
1170        // send ready to reimburse tx
1171        self.tx_sender
1172            .insert_try_to_send(
1173                dbtx,
1174                Some(TxMetadata {
1175                    tx_type: TransactionType::ReadyToReimburse,
1176                    operator_xonly_pk: Some(self.signer.xonly_public_key),
1177                    round_idx: Some(current_round_index),
1178                    kickoff_idx: None,
1179                    deposit_outpoint: None,
1180                }),
1181                ready_to_reimburse_tx,
1182                FeePayingType::CPFP,
1183                None,
1184                &[],
1185                &[],
1186                &[],
1187                &activation_prerequisites,
1188            )
1189            .await?;
1190
1191        // send next round tx
1192        self.tx_sender
1193            .insert_try_to_send(
1194                dbtx,
1195                Some(TxMetadata {
1196                    tx_type: TransactionType::Round,
1197                    operator_xonly_pk: Some(self.signer.xonly_public_key),
1198                    round_idx: Some(current_round_index.next_round()),
1199                    kickoff_idx: None,
1200                    deposit_outpoint: None,
1201                }),
1202                next_round_tx,
1203                FeePayingType::CPFP,
1204                None,
1205                &[],
1206                &[],
1207                &[ActivatedWithTxid {
1208                    txid: ready_to_reimburse_txid,
1209                    relative_block_height: self
1210                        .config
1211                        .protocol_paramset()
1212                        .operator_reimburse_timelock
1213                        as u32,
1214                }],
1215                &[],
1216            )
1217            .await?;
1218
1219        // update current round index
1220        self.db
1221            .update_current_round_index(Some(dbtx), current_round_index.next_round())
1222            .await?;
1223
1224        Ok(())
1225    }
1226
1227    #[cfg(feature = "automation")]
1228    async fn send_asserts(
1229        &self,
1230        mut dbtx: DatabaseTransaction<'_>,
1231        kickoff_data: KickoffData,
1232        deposit_data: DepositData,
1233        watchtower_challenges: HashMap<usize, Transaction>,
1234        _payout_blockhash: Witness,
1235        latest_blockhash: Witness,
1236    ) -> Result<(), BridgeError> {
1237        use bridge_circuit_host::utils::{get_verifying_key, is_dev_mode};
1238        use citrea_sov_rollup_interface::zk::light_client_proof::output::LightClientCircuitOutput;
1239
1240        let context = ContractContext::new_context_for_kickoff(
1241            kickoff_data,
1242            deposit_data.clone(),
1243            self.config.protocol_paramset(),
1244        );
1245        let mut db_cache = crate::builder::transaction::ReimburseDbCache::from_context(
1246            self.db.clone(),
1247            &context,
1248            Some(&mut dbtx),
1249        );
1250        let txhandlers = builder::transaction::create_txhandlers(
1251            TransactionType::Kickoff,
1252            context,
1253            &mut crate::builder::transaction::TxHandlerCache::new(),
1254            &mut db_cache,
1255        )
1256        .await?;
1257        let move_txid = txhandlers
1258            .get(&TransactionType::MoveToVault)
1259            .ok_or(eyre::eyre!(
1260                "Move to vault txhandler not found in send_asserts"
1261            ))?
1262            .get_cached_tx()
1263            .compute_txid();
1264        let kickoff_tx = txhandlers
1265            .get(&TransactionType::Kickoff)
1266            .ok_or(eyre::eyre!("Kickoff txhandler not found in send_asserts"))?
1267            .get_cached_tx();
1268
1269        #[cfg(test)]
1270        self.config
1271            .test_params
1272            .maybe_save_kickoff_and_wtc_txs(kickoff_tx, &watchtower_challenges, 1, &self.rpc)
1273            .await?;
1274
1275        let (payout_op_xonly_pk_opt, payout_block_hash, payout_txid, deposit_idx) = self
1276            .db
1277            .get_payout_info_from_move_txid(Some(&mut dbtx), move_txid)
1278            .await
1279            .wrap_err("Failed to get payout info from db during sending asserts.")?
1280            .ok_or_eyre(format!(
1281                "Payout info not found in db while sending asserts for move txid: {move_txid}"
1282            ))?;
1283
1284        let payout_op_xonly_pk = payout_op_xonly_pk_opt.ok_or_eyre(format!(
1285            "Payout operator xonly pk not found in payout info DB while sending asserts for deposit move txid: {move_txid}"
1286        ))?;
1287
1288        tracing::info!("Sending asserts for deposit_idx: {deposit_idx:?}");
1289
1290        if payout_op_xonly_pk != kickoff_data.operator_xonly_pk {
1291            return Err(eyre::eyre!(
1292                "Payout operator xonly pk does not match kickoff operator xonly pk in send_asserts"
1293            )
1294            .into());
1295        }
1296
1297        let (payout_block_height, payout_block) = self
1298            .db
1299            .get_full_block_from_hash(Some(&mut dbtx), payout_block_hash)
1300            .await?
1301            .ok_or_eyre(format!(
1302                "Payout block {payout_op_xonly_pk:?} {payout_block_hash:?} not found in db",
1303            ))?;
1304
1305        let payout_tx_index = payout_block
1306            .txdata
1307            .iter()
1308            .position(|tx| tx.compute_txid() == payout_txid)
1309            .ok_or_eyre(format!(
1310                "Payout txid {payout_txid:?} not found in block {payout_op_xonly_pk:?} {payout_block_hash:?}"
1311            ))?;
1312        let payout_tx = &payout_block.txdata[payout_tx_index];
1313        tracing::debug!("Calculated payout tx in send_asserts: {:?}", payout_tx);
1314
1315        let lcp_receipt = self
1316            .citrea_client
1317            .fetch_validate_and_store_lcp(
1318                payout_block_height as u64,
1319                deposit_idx as u32,
1320                &self.db,
1321                Some(&mut dbtx),
1322                self.config.protocol_paramset(),
1323            )
1324            .await?;
1325        let proof_output: LightClientCircuitOutput = borsh::from_slice(&lcp_receipt.journal.bytes)
1326            .wrap_err("Failed to deserialize light client circuit output")?;
1327        let l2_height = proof_output.last_l2_height;
1328        let light_client_proof = LightClientProof {
1329            lc_journal: lcp_receipt.journal.bytes.clone(),
1330        };
1331
1332        tracing::info!("Got light client proof in send_asserts");
1333
1334        let storage_proof = self
1335            .citrea_client
1336            .get_storage_proof(l2_height, deposit_idx as u32)
1337            .await
1338            .wrap_err(format!(
1339                "Failed to get storage proof for move txid {move_txid:?}, l2 height {l2_height}, deposit_idx {deposit_idx}",
1340            ))?;
1341
1342        tracing::debug!("Got storage proof in send_asserts {storage_proof:?}");
1343
1344        // get committed latest blockhash
1345        let wt_derive_path = ClementineBitVMPublicKeys::get_latest_blockhash_derivation(
1346            deposit_data.get_deposit_outpoint(),
1347            self.config.protocol_paramset(),
1348        );
1349        let commits = extract_winternitz_commits(
1350            latest_blockhash,
1351            &[wt_derive_path],
1352            self.config.protocol_paramset(),
1353        )?;
1354
1355        let latest_blockhash_last_20: [u8; 20] = commits
1356            .first()
1357            .ok_or_eyre("Failed to get latest blockhash in send_asserts")?
1358            .to_owned()
1359            .try_into()
1360            .map_err(|_| eyre::eyre!("Committed latest blockhash is not 20 bytes long"))?;
1361
1362        #[cfg(test)]
1363        let latest_blockhash_last_20 = self
1364            .config
1365            .test_params
1366            .maybe_disrupt_latest_block_hash_commit(latest_blockhash_last_20);
1367
1368        let rpc_current_finalized_height = self
1369            .rpc
1370            .get_current_chain_height()
1371            .await?
1372            .saturating_sub(self.config.protocol_paramset().finality_depth - 1);
1373
1374        // update headers in case the sync (state machine handle_finalized_block) is behind
1375        self.db
1376            .fetch_and_save_missing_blocks(
1377                Some(&mut dbtx),
1378                &self.rpc,
1379                self.config.protocol_paramset().genesis_height,
1380                rpc_current_finalized_height + 1,
1381            )
1382            .await?;
1383
1384        let current_height = self
1385            .db
1386            .get_latest_finalized_block_height(Some(&mut dbtx))
1387            .await?
1388            .ok_or_eyre("Failed to get current finalized block height")?;
1389
1390        let block_hashes = self
1391            .db
1392            .get_block_info_from_range(
1393                Some(&mut dbtx),
1394                self.config.protocol_paramset().genesis_height as u64,
1395                current_height,
1396            )
1397            .await?;
1398
1399        // find out which blockhash is latest_blockhash (only last 20 bytes is committed to Witness)
1400        let latest_blockhash_index = block_hashes
1401            .iter()
1402            .position(|(block_hash, _)| {
1403                block_hash.as_byte_array().last_20_bytes() == latest_blockhash_last_20
1404            })
1405            .ok_or_eyre("Failed to find latest blockhash in send_asserts")?;
1406
1407        let latest_blockhash = block_hashes[latest_blockhash_index].0;
1408
1409        let (current_hcp, _hcp_height) = self
1410            .header_chain_prover
1411            .prove_till_hash(latest_blockhash)
1412            .await?;
1413
1414        #[cfg(test)]
1415        let mut total_works: Vec<[u8; 16]> = Vec::with_capacity(watchtower_challenges.len());
1416
1417        #[cfg(test)]
1418        {
1419            use bridge_circuit_host::utils::total_work_from_wt_tx_test_util;
1420            for (_, tx) in watchtower_challenges.iter() {
1421                let total_work = total_work_from_wt_tx_test_util(tx);
1422                total_works.push(total_work);
1423            }
1424            tracing::debug!("Total works: {:?}", total_works);
1425        }
1426
1427        #[cfg(test)]
1428        let current_hcp = self
1429            .config
1430            .test_params
1431            .maybe_override_current_hcp(
1432                current_hcp,
1433                payout_block_hash,
1434                &block_hashes,
1435                &self.header_chain_prover,
1436                total_works.clone(),
1437            )
1438            .await?;
1439
1440        tracing::info!("Got header chain proof in send_asserts");
1441
1442        let blockhashes_serialized: Vec<[u8; 32]> = block_hashes
1443            .iter()
1444            .take(latest_blockhash_index + 1)
1445            .map(|(h, _)| h.to_byte_array())
1446            .collect();
1447
1448        #[cfg(test)]
1449        let blockhashes_serialized = self
1450            .config
1451            .test_params
1452            .maybe_override_blockhashes_serialized(
1453                blockhashes_serialized,
1454                payout_block_height,
1455                self.config.protocol_paramset().genesis_height,
1456                total_works,
1457            );
1458
1459        tracing::debug!(
1460            "Genesis height - Before SPV: {},",
1461            self.config.protocol_paramset().genesis_height
1462        );
1463
1464        let spv = create_spv(
1465            payout_tx.clone(),
1466            &blockhashes_serialized,
1467            payout_block.clone(),
1468            payout_block_height,
1469            self.config.protocol_paramset().genesis_height,
1470            payout_tx_index as u32,
1471        )?;
1472        tracing::info!("Calculated spv proof in send_asserts");
1473
1474        let mut wt_contexts = Vec::new();
1475        for (_, tx) in watchtower_challenges.iter() {
1476            wt_contexts.push(WatchtowerContext {
1477                watchtower_tx: tx.clone(),
1478                prevout_txs: self.rpc.get_prevout_txs(tx).await?,
1479            });
1480        }
1481
1482        #[cfg(test)]
1483        {
1484            if self.config.test_params.operator_forgot_watchtower_challenge {
1485                tracing::info!("Disrupting watchtower challenges in send_asserts");
1486                wt_contexts.pop();
1487            }
1488        }
1489
1490        let watchtower_challenge_connector_start_idx =
1491            (FIRST_FIVE_OUTPUTS + ClementineBitVMPublicKeys::number_of_assert_txs()) as u32;
1492
1493        let bridge_circuit_host_params = BridgeCircuitHostParams::new_with_wt_tx(
1494            kickoff_tx.clone(),
1495            spv,
1496            current_hcp,
1497            light_client_proof,
1498            lcp_receipt,
1499            storage_proof,
1500            self.config.protocol_paramset().network,
1501            &wt_contexts,
1502            watchtower_challenge_connector_start_idx,
1503        )
1504        .wrap_err("Failed to create bridge circuit host params in send_asserts")?;
1505
1506        let bridge_circuit_elf = match self.config.protocol_paramset().network {
1507            bitcoin::Network::Bitcoin => MAINNET_BRIDGE_CIRCUIT_ELF,
1508            bitcoin::Network::Testnet4 => {
1509                if is_dev_mode() {
1510                    TESTNET4_BRIDGE_CIRCUIT_ELF_TEST
1511                } else {
1512                    TESTNET4_BRIDGE_CIRCUIT_ELF
1513                }
1514            }
1515            bitcoin::Network::Signet => {
1516                if is_dev_mode() {
1517                    SIGNET_BRIDGE_CIRCUIT_ELF_TEST
1518                } else {
1519                    SIGNET_BRIDGE_CIRCUIT_ELF
1520                }
1521            }
1522            bitcoin::Network::Regtest => {
1523                if is_dev_mode() {
1524                    REGTEST_BRIDGE_CIRCUIT_ELF_TEST
1525                } else {
1526                    REGTEST_BRIDGE_CIRCUIT_ELF
1527                }
1528            }
1529            _ => {
1530                return Err(eyre::eyre!(
1531                    "Unsupported network {:?} in send_asserts",
1532                    self.config.protocol_paramset().network
1533                )
1534                .into())
1535            }
1536        };
1537        tracing::info!("Starting proving bridge circuit to send asserts");
1538
1539        #[cfg(test)]
1540        self.config
1541            .test_params
1542            .maybe_dump_bridge_circuit_params_to_file(&bridge_circuit_host_params)?;
1543
1544        let (g16_proof, g16_output, public_inputs) = tokio::task::spawn_blocking(move || {
1545            prove_bridge_circuit(bridge_circuit_host_params, bridge_circuit_elf)
1546        })
1547        .await
1548        .wrap_err("Failed to join the prove_bridge_circuit task")?
1549        .wrap_err("Failed to prove bridge circuit")?;
1550
1551        tracing::info!("Proved bridge circuit in send_asserts");
1552        let public_input_scalar = ark_bn254::Fr::from_be_bytes_mod_order(&g16_output);
1553
1554        #[cfg(test)]
1555        let mut public_inputs = public_inputs;
1556
1557        #[cfg(test)]
1558        {
1559            if self
1560                .config
1561                .test_params
1562                .disrupt_challenge_sending_watchtowers_commit
1563            {
1564                tracing::info!("Disrupting challenge sending watchtowers commit in send_asserts");
1565                public_inputs.challenge_sending_watchtowers[0] ^= 0x01;
1566                tracing::info!(
1567                    "Disrupted challenge sending watchtowers commit: {:?}",
1568                    public_inputs.challenge_sending_watchtowers
1569                );
1570            }
1571        }
1572
1573        tracing::info!(
1574            "Challenge sending watchtowers commit: {:?}",
1575            public_inputs.challenge_sending_watchtowers
1576        );
1577
1578        let asserts = tokio::task::spawn_blocking(move || {
1579            let vk = get_verifying_key();
1580
1581            generate_assertions(g16_proof, vec![public_input_scalar], &vk).map_err(|e| {
1582                eyre::eyre!(
1583                    "Failed to generate {}assertions: {}",
1584                    if is_dev_mode() { "dev mode " } else { "" },
1585                    e
1586                )
1587            })
1588        })
1589        .await
1590        .wrap_err("Generate assertions thread failed with error")??;
1591
1592        tracing::info!("Generated assertions in send_asserts");
1593
1594        #[cfg(test)]
1595        let asserts = self.config.test_params.maybe_corrupt_asserts(asserts);
1596
1597        tracing::debug!(target: "ci", "Assert commitment data: {:?}", asserts);
1598
1599        let assert_txs = self
1600            .create_assert_commitment_txs(
1601                TransactionRequestData {
1602                    kickoff_data,
1603                    deposit_outpoint: deposit_data.get_deposit_outpoint(),
1604                },
1605                ClementineBitVMPublicKeys::get_assert_commit_data(
1606                    asserts,
1607                    &public_inputs.challenge_sending_watchtowers,
1608                ),
1609                Some(&mut dbtx),
1610            )
1611            .await?;
1612
1613        for (tx_type, tx) in assert_txs {
1614            self.tx_sender
1615                .add_tx_to_queue(
1616                    dbtx,
1617                    tx_type,
1618                    &tx,
1619                    &[],
1620                    Some(TxMetadata {
1621                        tx_type,
1622                        operator_xonly_pk: Some(self.signer.xonly_public_key),
1623                        round_idx: Some(kickoff_data.round_idx),
1624                        kickoff_idx: Some(kickoff_data.kickoff_idx),
1625                        deposit_outpoint: Some(deposit_data.get_deposit_outpoint()),
1626                    }),
1627                    self.config.protocol_paramset(),
1628                    None,
1629                )
1630                .await?;
1631        }
1632        Ok(())
1633    }
1634
1635    fn data(&self) -> OperatorData {
1636        OperatorData {
1637            xonly_pk: self.signer.xonly_public_key,
1638            collateral_funding_outpoint: self.collateral_funding_outpoint,
1639            reimburse_addr: self.reimburse_addr.clone(),
1640        }
1641    }
1642
1643    #[cfg(feature = "automation")]
1644    async fn send_latest_blockhash(
1645        &self,
1646        mut dbtx: DatabaseTransaction<'_>,
1647        kickoff_data: KickoffData,
1648        deposit_data: DepositData,
1649        latest_blockhash: BlockHash,
1650    ) -> Result<(), BridgeError> {
1651        tracing::info!("Operator sending latest blockhash");
1652        let deposit_outpoint = deposit_data.get_deposit_outpoint();
1653        let (tx_type, tx) = self
1654            .create_latest_blockhash_tx(
1655                TransactionRequestData {
1656                    deposit_outpoint,
1657                    kickoff_data,
1658                },
1659                latest_blockhash,
1660                Some(&mut dbtx),
1661            )
1662            .await?;
1663        if tx_type != TransactionType::LatestBlockhash {
1664            return Err(eyre::eyre!("Latest blockhash tx type is not LatestBlockhash").into());
1665        }
1666        self.tx_sender
1667            .add_tx_to_queue(
1668                dbtx,
1669                tx_type,
1670                &tx,
1671                &[],
1672                Some(TxMetadata {
1673                    tx_type,
1674                    operator_xonly_pk: Some(self.signer.xonly_public_key),
1675                    round_idx: Some(kickoff_data.round_idx),
1676                    kickoff_idx: Some(kickoff_data.kickoff_idx),
1677                    deposit_outpoint: Some(deposit_outpoint),
1678                }),
1679                self.config.protocol_paramset(),
1680                None,
1681            )
1682            .await?;
1683        Ok(())
1684    }
1685
1686    /// For a deposit_id checks that the payer for that deposit is the operator, and the payout blockhash and kickoff txid are set.
1687    async fn validate_payer_is_operator(
1688        &self,
1689        dbtx: Option<DatabaseTransaction<'_>>,
1690        deposit_id: u32,
1691    ) -> Result<(BlockHash, Txid), BridgeError> {
1692        let (payer_xonly_pk, payout_blockhash, kickoff_txid) = self
1693            .db
1694            .get_payer_xonly_pk_blockhash_and_kickoff_txid_from_deposit_id(dbtx, deposit_id)
1695            .await?;
1696
1697        tracing::info!(
1698            "Payer xonly pk and kickoff txid found for the requested deposit, payer xonly pk: {:?}, kickoff txid: {:?}",
1699            payer_xonly_pk,
1700            kickoff_txid
1701        );
1702
1703        // first check if the payer is the operator, and the kickoff is handled
1704        // by the PayoutCheckerTask, meaning kickoff_txid is set
1705        let (payout_blockhash, kickoff_txid) = match (
1706            payer_xonly_pk,
1707            payout_blockhash,
1708            kickoff_txid,
1709        ) {
1710            (Some(payer_xonly_pk), Some(payout_blockhash), Some(kickoff_txid)) => {
1711                if payer_xonly_pk != self.signer.xonly_public_key {
1712                    return Err(eyre::eyre!(
1713                        "Payer is not own operator for deposit, payer xonly pk: {:?}, operator xonly pk: {:?}",
1714                        payer_xonly_pk,
1715                        self.signer.xonly_public_key
1716                    )
1717                    .into());
1718                }
1719                (payout_blockhash, kickoff_txid)
1720            }
1721            _ => {
1722                return Err(eyre::eyre!(
1723                    "Payer info not found for deposit, payout blockhash: {:?}, kickoff txid: {:?}",
1724                    payout_blockhash,
1725                    kickoff_txid
1726                )
1727                .into());
1728            }
1729        };
1730
1731        tracing::info!(
1732            "Payer xonly pk, payout blockhash and kickoff txid found and valid for own operator for the requested deposit id: {}, payer xonly pk: {:?}, payout blockhash: {:?}, kickoff txid: {:?}",
1733            deposit_id,
1734            payer_xonly_pk,
1735            payout_blockhash,
1736            kickoff_txid
1737        );
1738
1739        Ok((payout_blockhash, kickoff_txid))
1740    }
1741
1742    async fn get_next_txs_to_send(
1743        &self,
1744        mut dbtx: Option<DatabaseTransaction<'_>>,
1745        deposit_data: &mut DepositData,
1746        payout_blockhash: BlockHash,
1747        kickoff_txid: Txid,
1748        current_round_idx: RoundIndex,
1749    ) -> Result<Vec<(TransactionType, Transaction)>, BridgeError> {
1750        let mut txs_to_send = Vec::new();
1751
1752        // get used kickoff connector for the kickoff txid
1753        let (kickoff_round_idx, kickoff_connector_idx) = self
1754            .db
1755            .get_kickoff_connector_for_kickoff_txid(dbtx.as_deref_mut(), kickoff_txid)
1756            .await?;
1757
1758        let context = ContractContext::new_context_for_kickoff(
1759            KickoffData {
1760                operator_xonly_pk: self.signer.xonly_public_key,
1761                round_idx: kickoff_round_idx,
1762                kickoff_idx: kickoff_connector_idx,
1763            },
1764            deposit_data.clone(),
1765            self.config.protocol_paramset(),
1766        );
1767
1768        // get txs for the kickoff
1769        let kickoff_txs = create_and_sign_txs(
1770            self.db.clone(),
1771            &self.signer,
1772            self.config.clone(),
1773            context,
1774            Some(payout_blockhash.to_byte_array().last_20_bytes()),
1775            dbtx.as_deref_mut(),
1776        )
1777        .await?;
1778
1779        // check the current round status compared to the round of the assigned kickoff tx
1780        match current_round_idx
1781            .to_index()
1782            .cmp(&kickoff_round_idx.to_index())
1783        {
1784            std::cmp::Ordering::Less => {
1785                // We need to advance the round manually to be able to start the kickoff
1786                tracing::info!("We need to advance the round manually to be able to start the kickoff, current round idx: {:?}, kickoff round idx: {:?}", current_round_idx, kickoff_round_idx);
1787                let txs = self.advance_round_manually(dbtx, current_round_idx).await?;
1788                txs_to_send.extend(txs);
1789            }
1790            std::cmp::Ordering::Greater => {
1791                tracing::info!("We are at least on the next round, meaning we can get the reimbursement as reimbursement utxos are in the next round, current round idx: {:?}, kickoff round idx: {:?}", current_round_idx, kickoff_round_idx);
1792                // we are at least on the next round, meaning we can get the reimbursement as reimbursement utxos are in the next round
1793                let reimbursement_tx = kickoff_txs
1794                    .iter()
1795                    .find(|(tx_type, _)| tx_type == &TransactionType::Reimburse)
1796                    .ok_or(eyre::eyre!("Reimburse tx not found in kickoff txs"))?;
1797                txs_to_send.push(reimbursement_tx.clone());
1798            }
1799            std::cmp::Ordering::Equal => {
1800                // first check if the kickoff is in chain
1801                if !self.rpc.is_tx_on_chain(&kickoff_txid).await? {
1802                    tracing::info!(
1803                        "Kickoff tx is not on chain, can send it, kickoff txid: {:?}",
1804                        kickoff_txid
1805                    );
1806                    let kickoff_tx = kickoff_txs
1807                        .iter()
1808                        .find(|(tx_type, _)| tx_type == &TransactionType::Kickoff)
1809                        .ok_or(eyre::eyre!("Kickoff tx not found in kickoff txs"))?;
1810
1811                    // fetch and save the LCP for if we get challenged and need to provide proof of payout later
1812                    let (_, payout_block_height) = self
1813                        .db
1814                        .get_block_info_from_hash(dbtx.as_deref_mut(), payout_blockhash)
1815                        .await?
1816                        .ok_or_eyre("Couldn't find payout blockhash in bitcoin sync")?;
1817
1818                    let move_txid = deposit_data.get_move_txid(self.config.protocol_paramset())?;
1819
1820                    let (_, _, _, citrea_idx) = self
1821                        .db
1822                        .get_payout_info_from_move_txid(dbtx.as_deref_mut(), move_txid)
1823                        .await?
1824                        .ok_or_eyre("Couldn't find payout info from move txid")?;
1825
1826                    let _ = self
1827                        .citrea_client
1828                        .fetch_validate_and_store_lcp(
1829                            payout_block_height as u64,
1830                            citrea_idx as u32,
1831                            &self.db,
1832                            dbtx.as_deref_mut(),
1833                            self.config.protocol_paramset(),
1834                        )
1835                        .await?;
1836
1837                    // sanity check
1838                    if kickoff_tx.1.compute_txid() != kickoff_txid {
1839                        return Err(eyre::eyre!("Kickoff txid mismatch for deposit outpoint: {}, kickoff txid: {:?}, computed txid: {:?}",
1840                        deposit_data.get_deposit_outpoint(), kickoff_txid, kickoff_tx.1.compute_txid()).into());
1841                    }
1842                    txs_to_send.push(kickoff_tx.clone());
1843                }
1844                // kickoff tx is on chain, check if kickoff finalizer is spent
1845                else if !self
1846                    .rpc
1847                    .is_utxo_spent(&OutPoint {
1848                        txid: kickoff_txid,
1849                        vout: UtxoVout::KickoffFinalizer.get_vout(),
1850                    })
1851                    .await?
1852                {
1853                    // kickoff finalizer is not spent, we need to send challenge timeout
1854                    tracing::info!(
1855                        "Kickoff finalizer is not spent, can send challenge timeout, kickoff txid: {:?}",
1856                        kickoff_txid
1857                    );
1858                    // first check if challenge tx was sent, then we need automation enabled to be able to answer the challenge
1859                    if self
1860                        .rpc
1861                        .is_utxo_spent(&OutPoint {
1862                            txid: kickoff_txid,
1863                            vout: UtxoVout::Challenge.get_vout(),
1864                        })
1865                        .await?
1866                    {
1867                        // challenge tx was sent, we need automation enabled to be able to answer the challenge
1868                        tracing::warn!(
1869                            "Challenge tx was sent for deposit outpoint: {:?}, but automation is not enabled, enable automation!",
1870                            deposit_data.get_deposit_outpoint()
1871                        );
1872                        return Err(eyre::eyre!("WARNING: Challenge tx was sent to kickoff connector {:?}, but automation is not enabled, enable automation!", kickoff_txid).into());
1873                    }
1874                    let challenge_timeout_tx = kickoff_txs
1875                        .iter()
1876                        .find(|(tx_type, _)| tx_type == &TransactionType::ChallengeTimeout)
1877                        .ok_or(eyre::eyre!("Challenge timeout tx not found in kickoff txs"))?;
1878                    txs_to_send.push(challenge_timeout_tx.clone());
1879                } else {
1880                    // if kickoff finalizer is spent, it is time to get the reimbursement
1881                    tracing::info!(
1882                        "Kickoff finalizer is spent, can advance the round manually to get the reimbursement, current round idx: {:?}, kickoff round idx: {:?}",
1883                        current_round_idx,
1884                        kickoff_round_idx
1885                    );
1886                    let txs = self.advance_round_manually(dbtx, current_round_idx).await?;
1887                    txs_to_send.extend(txs);
1888                }
1889            }
1890        }
1891        Ok(txs_to_send)
1892    }
1893
1894    /// Transfers the given outpoints to the operator's btc wallet address.
1895    /// The outpoints must belong to the operator's taproot address (xonly key, no merkle root).
1896    /// The function also checks if any outpoint is the collateral of the operator, and returns an error if so.
1897    /// # Arguments
1898    /// - inputs: A vector of tuples, each containing an outpoint and the corresponding txout.
1899    /// # Returns
1900    /// - The signed transaction that sends the given outpoints to the operator's btc wallet address.
1901    pub async fn transfer_outpoints_to_wallet(
1902        &self,
1903        inputs: Vec<(OutPoint, TxOut)>,
1904    ) -> Result<Transaction, BridgeError> {
1905        if inputs.is_empty() {
1906            return Err(eyre!("No outpoints provided for transfer").into());
1907        }
1908
1909        // check if any outpoint is a collateral outpoint
1910        let collateral_outpoints = self
1911            .get_all_collateral_outpoints()
1912            .await
1913            .wrap_err("Failed to get all collateral outpoints")?;
1914        for (outpoint, _) in inputs.iter() {
1915            if collateral_outpoints.contains_key(outpoint) {
1916                let (round_idx, tx_type) = collateral_outpoints
1917                    .get(outpoint)
1918                    .expect("Collateral outpoint should be found in the map");
1919                return Err(
1920                    eyre!("Cannot transfer collateral outpoint {outpoint} belonging to {round_idx:?} {tx_type:?} to wallet").into(),
1921                );
1922            }
1923        }
1924
1925        let destination_script = self
1926            .rpc
1927            .get_new_address(None, Some(AddressType::Bech32m))
1928            .await
1929            .wrap_err("Failed to get new wallet address for transfer")?
1930            .require_network(self.config.protocol_paramset().network)
1931            .wrap_err("Failed to get new address, bitcoin rpc might not match the network")?
1932            .script_pubkey();
1933
1934        let (_, spendinfo) = create_taproot_address(
1935            &[],
1936            Some(self.signer.xonly_public_key),
1937            self.config.protocol_paramset().network,
1938        );
1939
1940        let total_input_value = inputs.iter().try_fold(Amount::ZERO, |acc, (_, txout)| {
1941            acc.checked_add(txout.value)
1942                .ok_or_else(|| eyre!("Input values overflowed while summing"))
1943        })?;
1944
1945        let mut output_txout = TxOut {
1946            value: total_input_value,
1947            script_pubkey: destination_script,
1948        };
1949
1950        let mut builder = TxHandlerBuilder::new(TransactionType::Dummy)
1951            .with_version(bitcoin::transaction::Version::TWO);
1952
1953        for (outpoint, txout) in inputs.iter() {
1954            builder = builder.add_input(
1955                NormalSignatureKind::OperatorSighashDefault,
1956                SpendableTxIn::new(*outpoint, txout.clone(), vec![], Some(spendinfo.clone())),
1957                SpendPath::KeySpend,
1958                DEFAULT_SEQUENCE,
1959            );
1960        }
1961
1962        builder = builder.add_output(UnspentTxOut::from_partial(output_txout.clone()));
1963
1964        let mut txhandler = builder.finalize();
1965
1966        let fee_rate = self
1967            .rpc
1968            .get_fee_rate_kvb(
1969                self.config.protocol_paramset().network,
1970                &self.config.mempool_api_host,
1971                &self.config.mempool_api_endpoint,
1972                self.config.tx_sender_limits.mempool_fee_rate_multiplier,
1973                self.config.tx_sender_limits.mempool_fee_rate_offset_sat_kvb,
1974                self.config.tx_sender_limits.fee_rate_hard_cap,
1975            )
1976            .await
1977            .wrap_err("Failed to get fee rate for transfer to wallet tx")?;
1978
1979        // Sign to account for witness weight when calculating fees.
1980        self.signer
1981            .tx_sign_and_fill_sigs(&mut txhandler, &[], None)?;
1982
1983        let tx_weight = txhandler.get_cached_tx().weight();
1984        let fee = fee_rate.fee_wu(tx_weight).ok_or_eyre(format!(
1985            "Fee calculation overflow in transfer to wallet, current feerate: {fee_rate}"
1986        ))?;
1987
1988        output_txout.value = output_txout
1989            .value
1990            .checked_sub(fee)
1991            .ok_or_else(|| eyre!("Calculated fee exceeds total input value"))?;
1992
1993        let mut builder = TxHandlerBuilder::new(TransactionType::Dummy)
1994            .with_version(bitcoin::transaction::Version::TWO);
1995
1996        for (outpoint, txout) in inputs.iter() {
1997            builder = builder.add_input(
1998                NormalSignatureKind::OperatorSighashDefault,
1999                SpendableTxIn::new(*outpoint, txout.clone(), vec![], Some(spendinfo.clone())),
2000                SpendPath::KeySpend,
2001                DEFAULT_SEQUENCE,
2002            );
2003        }
2004
2005        builder = builder.add_output(UnspentTxOut::from_partial(output_txout.clone()));
2006
2007        let mut txhandler = builder.finalize();
2008
2009        self.signer
2010            .tx_sign_and_fill_sigs(&mut txhandler, &[], None)?;
2011
2012        let signed_tx = txhandler.get_cached_tx().clone();
2013
2014        self.rpc
2015            .send_raw_transaction(&signed_tx)
2016            .await
2017            .wrap_err("Failed to send from operator's address to btc wallet address")?;
2018
2019        Ok(signed_tx)
2020    }
2021
2022    /// Gets all collateral outpoints for the operator.
2023    /// Returns a map of outpoint to the round index it belongs to.
2024    async fn get_all_collateral_outpoints(
2025        &self,
2026    ) -> Result<HashMap<OutPoint, (RoundIndex, TransactionType)>, BridgeError> {
2027        let mut outpoints = HashMap::new();
2028        outpoints.insert(
2029            self.collateral_funding_outpoint,
2030            (RoundIndex::Collateral, TransactionType::Round),
2031        );
2032
2033        // Fetch operator kickoff winternitz public keys to build round txs
2034        let operator_winternitz_public_keys = self
2035            .db
2036            .get_operator_kickoff_winternitz_public_keys(None, self.signer.xonly_public_key)
2037            .await?;
2038        let kickoff_wpks = KickoffWinternitzKeys::new(
2039            operator_winternitz_public_keys,
2040            self.config.protocol_paramset().num_kickoffs_per_round,
2041            self.config.protocol_paramset().num_round_txs,
2042        )?;
2043        let operator_data = self.data();
2044
2045        let mut prev_ready_to_reimburse: Option<TxHandler> = None;
2046
2047        // Collect collateral outpoints for each round
2048        for round_idx in RoundIndex::iter_rounds(self.config.protocol_paramset().num_round_txs) {
2049            let txhandlers = create_round_txhandlers(
2050                self.config.protocol_paramset(),
2051                round_idx,
2052                &operator_data,
2053                &kickoff_wpks,
2054                prev_ready_to_reimburse.as_ref(),
2055            )?;
2056
2057            let round_tx = txhandlers
2058                .iter()
2059                .find(|txhandler| txhandler.get_transaction_type() == TransactionType::Round)
2060                .ok_or(eyre::eyre!("Round tx not found in txhandlers"))?;
2061            let collateral_outpoint = OutPoint {
2062                txid: *round_tx.get_txid(),
2063                vout: UtxoVout::CollateralInRound.get_vout(),
2064            };
2065            outpoints.insert(collateral_outpoint, (round_idx, TransactionType::Round));
2066
2067            let ready_to_reimburse_tx = txhandlers
2068                .iter()
2069                .find(|txhandler| {
2070                    txhandler.get_transaction_type() == TransactionType::ReadyToReimburse
2071                })
2072                .ok_or(eyre::eyre!("Ready to reimburse tx not found in txhandlers"))?;
2073            let ready_to_reimburse_collateral_outpoint = OutPoint {
2074                txid: *ready_to_reimburse_tx.get_txid(),
2075                vout: UtxoVout::CollateralInReadyToReimburse.get_vout(),
2076            };
2077            outpoints.insert(
2078                ready_to_reimburse_collateral_outpoint,
2079                (round_idx, TransactionType::ReadyToReimburse),
2080            );
2081            prev_ready_to_reimburse = Some(ready_to_reimburse_tx.clone());
2082        }
2083
2084        Ok(outpoints)
2085    }
2086
2087    /// For a given deposit outpoint, get the txs that are needed to reimburse the deposit.
2088    /// To avoid operator getting slashed, this function only returns the next tx that needs to be sent
2089    /// This fn can track and enable sending of these transactions during a normal reimbursement process.
2090    ///
2091    /// - First, if the current round is less than the round of the kickoff assigned to the deposit by PayoutCheckerTask, it returns the Round TX.
2092    /// - After Round tx is sent, it returns the Kickoff tx.
2093    /// - After Kickoff tx is sent, it returns the challenge timeout tx.
2094    /// - After challenge timeout tx is sent, it returns BurnUnusedKickoffConnectors tx. If challenge timeout tx is not sent, and but challenge utxo was spent, it means the kickoff was challenged, thus the fn returns an error as it cannot handle the challenge process. Automation is required to answer the challenge.
2095    /// - After all kickoff utxos are spent, and for any live kickoff, all kickoff finalizers are spent, it returns the ReadyToReimburse tx.
2096    /// - After ReadyToReimburse tx is sent, it returns the next Round tx to generate reimbursement utxos.
2097    /// - Finally, after the next round tx is sent, it returns the Reimburse tx.
2098    pub async fn get_reimbursement_txs(
2099        &self,
2100        deposit_outpoint: OutPoint,
2101    ) -> Result<Vec<(TransactionType, Transaction)>, BridgeError> {
2102        let mut dbtx = self.db.begin_transaction().await?;
2103        // first check if the deposit is in the database
2104        let (deposit_id, mut deposit_data) = self
2105            .db
2106            .get_deposit_data(Some(&mut dbtx), deposit_outpoint)
2107            .await?
2108            .ok_or_eyre(format!(
2109                "Deposit data not found for the requested deposit outpoint: {deposit_outpoint:?}, make sure you send the deposit outpoint, not the move txid."
2110            ))?;
2111
2112        tracing::info!(
2113            "Deposit data found for the requested deposit outpoint: {deposit_outpoint:?}, deposit id: {deposit_id:?}",
2114        );
2115
2116        // validate payer is operator and get payer xonly pk, payout blockhash and kickoff txid
2117        let (payout_blockhash, kickoff_txid) = self
2118            .validate_payer_is_operator(Some(&mut dbtx), deposit_id)
2119            .await?;
2120
2121        let mut current_round_idx = self.db.get_current_round_index(Some(&mut dbtx)).await?;
2122
2123        let mut txs_to_send: Vec<(TransactionType, Transaction)>;
2124
2125        loop {
2126            txs_to_send = self
2127                .get_next_txs_to_send(
2128                    Some(&mut dbtx),
2129                    &mut deposit_data,
2130                    payout_blockhash,
2131                    kickoff_txid,
2132                    current_round_idx,
2133                )
2134                .await?;
2135            if txs_to_send.is_empty() {
2136                // if no txs were returned, and we advanced the round in the db, ask for the next txs again
2137                // with the new round index
2138                let round_idx_after_operations =
2139                    self.db.get_current_round_index(Some(&mut dbtx)).await?;
2140                if round_idx_after_operations != current_round_idx {
2141                    current_round_idx = round_idx_after_operations;
2142                    continue;
2143                }
2144            }
2145            break;
2146        }
2147
2148        dbtx.commit().await?;
2149        Ok(txs_to_send)
2150    }
2151
2152    /// Checks the current round status, and returns the next txs that are safe to send to be
2153    /// able to advance to the next round.
2154    async fn advance_round_manually(
2155        &self,
2156        mut dbtx: Option<DatabaseTransaction<'_>>,
2157        round_idx: RoundIndex,
2158    ) -> Result<Vec<(TransactionType, Transaction)>, BridgeError> {
2159        if round_idx == RoundIndex::Collateral {
2160            // if current round is collateral, nothing to do except send the first round tx
2161            return self.send_next_round_tx(dbtx, round_idx).await;
2162        }
2163
2164        // get round txhandlers
2165        let context = ContractContext::new_context_for_round(
2166            self.signer.xonly_public_key,
2167            round_idx,
2168            self.config.protocol_paramset(),
2169        );
2170
2171        let txs = create_and_sign_txs(
2172            self.db.clone(),
2173            &self.signer,
2174            self.config.clone(),
2175            context,
2176            None,
2177            dbtx.as_deref_mut(),
2178        )
2179        .await?;
2180
2181        let round_tx = txs
2182            .iter()
2183            .find(|(tx_type, _)| tx_type == &TransactionType::Round)
2184            .ok_or(eyre::eyre!("Round tx not found in txs"))?;
2185
2186        if !self.rpc.is_tx_on_chain(&round_tx.1.compute_txid()).await? {
2187            return Err(eyre::eyre!("Round tx for round {:?} is not on chain, but the database shows we are on this round, error", round_idx).into());
2188        }
2189
2190        // check if ready to reimburse tx was sent
2191        let ready_to_reimburse_tx = txs
2192            .iter()
2193            .find(|(tx_type, _)| tx_type == &TransactionType::ReadyToReimburse)
2194            .ok_or(eyre::eyre!("Ready to reimburse tx not found in txs"))?;
2195
2196        let mut txs_to_send = Vec::new();
2197
2198        // to be able to send ready to reimburse tx, we need to make sure, all kickoff utxos are spent, and for all kickoffs, all kickoff finalizers are spent
2199        if !self
2200            .rpc
2201            .is_tx_on_chain(&ready_to_reimburse_tx.1.compute_txid())
2202            .await?
2203        {
2204            tracing::info!("Ready to reimburse tx for round {:?} is not on chain, checking prerequisites to see if we are able to send it
2205            Prerequisites:
2206            - all kickoff utxos are spent
2207            - for all kickoffs, all kickoff finalizers are spent
2208            ", round_idx);
2209            // get max height saved in bitcoin syncer
2210            let current_chain_height = self
2211                .db
2212                .get_max_height(dbtx.as_deref_mut())
2213                .await?
2214                .ok_or_eyre("Max block height is not found in the btc syncer database")?;
2215
2216            let round_txid = round_tx.1.compute_txid();
2217            let (unspent_kickoff_utxos, are_all_utxos_spent_finalized) = self
2218                .find_and_mark_unspent_kickoff_utxos(
2219                    dbtx.as_deref_mut(),
2220                    round_idx,
2221                    round_txid,
2222                    current_chain_height,
2223                )
2224                .await?;
2225
2226            if !unspent_kickoff_utxos.is_empty() {
2227                let burn_txs = self
2228                    .create_burn_unused_kickoff_connectors_tx(round_idx, &unspent_kickoff_utxos)
2229                    .await?;
2230                txs_to_send.extend(burn_txs);
2231            } else if !are_all_utxos_spent_finalized {
2232                // if some utxos are not spent, we need to wait until they are spent
2233                return Err(eyre::eyre!(format!(
2234                    "The transactions that spend the kickoff utxos are not yet finalized, wait until they are finalized. Finality depth: {}
2235                    If they are actually finalized, but this error is returned, it means internal bitcoin syncer is slow or stopped.",
2236                    self.config.protocol_paramset().finality_depth
2237                ))
2238                .into());
2239            } else {
2240                // every kickoff utxo is spent, but we need to check if all kickoff finalizers are spent
2241                // if not, we return and error and wait until they are spent
2242                // if all finalizers are spent, it is safe to send ready to reimburse tx
2243                self.validate_all_kickoff_finalizers_spent(
2244                    dbtx.as_deref_mut(),
2245                    round_idx,
2246                    current_chain_height,
2247                )
2248                .await?;
2249                // all finalizers and kickoff utxos are spent, it is safe to send ready to reimburse tx
2250                txs_to_send.push(ready_to_reimburse_tx.clone());
2251            }
2252        } else {
2253            // ready to reimburse tx is on chain, we need to wait for the timelock to send the next round tx
2254            // first check if next round tx is already sent, that means we can update the database
2255            txs_to_send.extend(self.send_next_round_tx(dbtx, round_idx).await?);
2256        }
2257
2258        Ok(txs_to_send)
2259    }
2260
2261    /// Finds unspent kickoff UTXOs and marks spent ones as used in the database.
2262    /// Returns the unspent kickoff utxos (doesn't matter if finalized or unfinalized) and a boolean to mark if all utxos are spent and finalized
2263    async fn find_and_mark_unspent_kickoff_utxos(
2264        &self,
2265        mut dbtx: Option<DatabaseTransaction<'_>>,
2266        round_idx: RoundIndex,
2267        round_txid: Txid,
2268        current_chain_height: u32,
2269    ) -> Result<(Vec<usize>, bool), BridgeError> {
2270        // check and collect all kickoff utxos that are not spent
2271        let mut unspent_kickoff_utxos = Vec::new();
2272        // a variable to mark if any any kickoff utxo is spent, but still not finalized
2273        let mut fully_finalized_spent = true;
2274        for kickoff_idx in 0..self.config.protocol_paramset().num_kickoffs_per_round {
2275            let kickoff_utxo = OutPoint {
2276                txid: round_txid,
2277                vout: UtxoVout::Kickoff(kickoff_idx).get_vout(),
2278            };
2279            if !self.rpc.is_utxo_spent(&kickoff_utxo).await? {
2280                unspent_kickoff_utxos.push(kickoff_idx);
2281            } else {
2282                // set the kickoff connector as used (it will do nothing if the utxo is already in db, so it won't overwrite the kickoff txid)
2283                // mark so that we don't try to use this utxo anymore
2284                self.db
2285                    .mark_kickoff_connector_as_used(
2286                        dbtx.as_deref_mut(),
2287                        round_idx,
2288                        kickoff_idx as u32,
2289                        None,
2290                    )
2291                    .await?;
2292                // check if the tx that spent the kickoff utxo is finalized
2293                // use btc syncer for this
2294                fully_finalized_spent &= self
2295                    .db
2296                    .check_if_utxo_spending_tx_is_finalized(
2297                        dbtx.as_deref_mut(),
2298                        kickoff_utxo,
2299                        current_chain_height,
2300                        self.config.protocol_paramset(),
2301                    )
2302                    .await?;
2303            }
2304        }
2305        Ok((unspent_kickoff_utxos, fully_finalized_spent))
2306    }
2307
2308    /// Creates a transaction that burns unused kickoff connectors.
2309    async fn create_burn_unused_kickoff_connectors_tx(
2310        &self,
2311        round_idx: RoundIndex,
2312        unspent_kickoff_utxos: &[usize],
2313    ) -> Result<Vec<(TransactionType, Transaction)>, BridgeError> {
2314        tracing::info!(
2315            "There are unspent kickoff utxos {:?}, creating a tx that spends them",
2316            unspent_kickoff_utxos
2317        );
2318        let operator_winternitz_public_keys = self.generate_kickoff_winternitz_pubkeys()?;
2319        let kickoff_wpks = KickoffWinternitzKeys::new(
2320            operator_winternitz_public_keys,
2321            self.config.protocol_paramset().num_kickoffs_per_round,
2322            self.config.protocol_paramset().num_round_txs,
2323        )?;
2324        // if there are unspent kickoff utxos, create a tx that spends them
2325        let (round_txhandler, _ready_to_reimburse_txhandler) = create_round_nth_txhandler(
2326            self.signer.xonly_public_key,
2327            self.collateral_funding_outpoint,
2328            self.config.protocol_paramset().collateral_funding_amount,
2329            round_idx,
2330            &kickoff_wpks,
2331            self.config.protocol_paramset(),
2332        )?;
2333        let mut burn_unused_kickoff_connectors_txhandler =
2334            create_burn_unused_kickoff_connectors_txhandler(
2335                &round_txhandler,
2336                unspent_kickoff_utxos,
2337                &self.reimburse_addr,
2338                self.config.protocol_paramset(),
2339            )?;
2340
2341        // sign burn unused kickoff connectors tx
2342        self.signer
2343            .tx_sign_and_fill_sigs(&mut burn_unused_kickoff_connectors_txhandler, &[], None)
2344            .wrap_err("Failed to sign burn unused kickoff connectors tx")?;
2345
2346        let burn_unused_kickoff_connectors_txhandler =
2347            burn_unused_kickoff_connectors_txhandler.promote()?;
2348        Ok(vec![(
2349            TransactionType::BurnUnusedKickoffConnectors,
2350            burn_unused_kickoff_connectors_txhandler
2351                .get_cached_tx()
2352                .clone(),
2353        )])
2354    }
2355
2356    /// Validates that all kickoff finalizers are spent for the given round.
2357    async fn validate_all_kickoff_finalizers_spent(
2358        &self,
2359        mut dbtx: Option<DatabaseTransaction<'_>>,
2360        round_idx: RoundIndex,
2361        current_chain_height: u32,
2362    ) -> Result<(), BridgeError> {
2363        // we need to check if all finalizers are spent
2364        for kickoff_idx in 0..self.config.protocol_paramset().num_kickoffs_per_round {
2365            let kickoff_txid = self
2366                .db
2367                .get_kickoff_txid_for_used_kickoff_connector(
2368                    dbtx.as_deref_mut(),
2369                    round_idx,
2370                    kickoff_idx as u32,
2371                )
2372                .await?;
2373            if let Some(kickoff_txid) = kickoff_txid {
2374                let deposit_outpoint = self
2375                    .db
2376                    .get_deposit_outpoint_for_kickoff_txid(dbtx.as_deref_mut(), kickoff_txid)
2377                    .await?;
2378                let kickoff_finalizer_utxo = OutPoint {
2379                    txid: kickoff_txid,
2380                    vout: UtxoVout::KickoffFinalizer.get_vout(),
2381                };
2382                if !self.rpc.is_tx_on_chain(&kickoff_txid).await? {
2383                    return Err(eyre::eyre!(
2384                        "For round {:?} and kickoff utxo {:?}, the kickoff tx {:?} is not on chain,
2385                    reimburse the deposit {:?} corresponding to this kickoff first. ",
2386                        round_idx,
2387                        kickoff_idx,
2388                        kickoff_txid,
2389                        deposit_outpoint
2390                    )
2391                    .into());
2392                } else if !self.rpc.is_utxo_spent(&kickoff_finalizer_utxo).await? {
2393                    return Err(eyre::eyre!("For round {:?} and kickoff utxo {:?}, the kickoff finalizer {:?} is not spent,
2394                    send the challenge timeout tx for the deposit {:?} first", round_idx, kickoff_idx, kickoff_txid, deposit_outpoint).into());
2395                } else if !self
2396                    .db
2397                    .check_if_utxo_spending_tx_is_finalized(
2398                        dbtx.as_deref_mut(),
2399                        kickoff_finalizer_utxo,
2400                        current_chain_height,
2401                        self.config.protocol_paramset(),
2402                    )
2403                    .await?
2404                {
2405                    return Err(eyre::eyre!("For round {:?} and kickoff utxo {:?}, the kickoff finalizer utxo {:?} is spent, but not yet finalized, wait until it is finalized. Finality depth: {}
2406                    If the transaction is actually finalized, but this error is returned, it means internal bitcoin syncer is slow or stopped.", round_idx, kickoff_idx, kickoff_finalizer_utxo, self.config.protocol_paramset().finality_depth).into());
2407                }
2408            }
2409        }
2410        Ok(())
2411    }
2412
2413    /// Checks if the next round tx is on chain, if it is, updates the database, otherwise returns the round tx that needs to be sent.
2414    async fn send_next_round_tx(
2415        &self,
2416        mut dbtx: Option<DatabaseTransaction<'_>>,
2417        round_idx: RoundIndex,
2418    ) -> Result<Vec<(TransactionType, Transaction)>, BridgeError> {
2419        let next_round_context = ContractContext::new_context_for_round(
2420            self.signer.xonly_public_key,
2421            round_idx.next_round(),
2422            self.config.protocol_paramset(),
2423        );
2424        let next_round_txs = create_and_sign_txs(
2425            self.db.clone(),
2426            &self.signer,
2427            self.config.clone(),
2428            next_round_context,
2429            None,
2430            dbtx.as_deref_mut(),
2431        )
2432        .await?;
2433        let next_round_tx = next_round_txs
2434            .iter()
2435            .find(|(tx_type, _)| tx_type == &TransactionType::Round)
2436            .ok_or(eyre::eyre!("Next round tx not found in txs"))?;
2437        let next_round_txid = next_round_tx.1.compute_txid();
2438
2439        if !self.rpc.is_tx_on_chain(&next_round_txid).await? {
2440            // if next round tx is not on chain, we need to wait for the timelock to send it
2441            Ok(vec![next_round_tx.clone()])
2442        } else {
2443            // if next round tx is on chain, we need to update the database
2444            self.db
2445                .update_current_round_index(dbtx, round_idx.next_round())
2446                .await?;
2447            Ok(vec![])
2448        }
2449    }
2450
2451    /// Adds relevant transactions to tx_sender queue for a kickoff.
2452    /// Used during resync to ensure all necessary txs are queued.
2453    #[cfg(feature = "automation")]
2454    async fn queue_relevant_txs_for_new_kickoff(
2455        &self,
2456        dbtx: DatabaseTransaction<'_>,
2457        kickoff_data: KickoffData,
2458        deposit_data: DepositData,
2459    ) -> Result<(), BridgeError> {
2460        let context = ContractContext::new_context_for_kickoff(
2461            kickoff_data,
2462            deposit_data.clone(),
2463            self.config.protocol_paramset(),
2464        );
2465        let signed_txs = create_and_sign_txs(
2466            self.db.clone(),
2467            &self.signer,
2468            self.config.clone(),
2469            context,
2470            // dummy blockhash, as kickoff is already sent and payout blockhash does not affect the txid, we can use a dummy blockhash
2471            Some([0u8; 20]),
2472            Some(dbtx),
2473        )
2474        .await?;
2475        let tx_metadata = Some(TxMetadata {
2476            tx_type: TransactionType::Dummy,
2477            operator_xonly_pk: Some(self.signer.xonly_public_key),
2478            round_idx: Some(kickoff_data.round_idx),
2479            kickoff_idx: Some(kickoff_data.kickoff_idx),
2480            deposit_outpoint: Some(deposit_data.get_deposit_outpoint()),
2481        });
2482        for (tx_type, signed_tx) in &signed_txs {
2483            match *tx_type {
2484                TransactionType::OperatorChallengeAck(_)
2485                | TransactionType::WatchtowerChallengeTimeout(_)
2486                | TransactionType::ChallengeTimeout
2487                | TransactionType::DisproveTimeout
2488                | TransactionType::Reimburse => {
2489                    self.tx_sender
2490                        .add_tx_to_queue(
2491                            dbtx,
2492                            *tx_type,
2493                            signed_tx,
2494                            &signed_txs,
2495                            tx_metadata,
2496                            self.config.protocol_paramset(),
2497                            None,
2498                        )
2499                        .await?;
2500                }
2501                _ => {}
2502            }
2503        }
2504        Ok(())
2505    }
2506}
2507
2508impl<C> NamedEntity for Operator<C>
2509where
2510    C: CitreaClientT,
2511{
2512    const ENTITY_NAME: &'static str = "operator";
2513    const FINALIZED_BLOCK_CONSUMER_ID_AUTOMATION: &'static str =
2514        "operator_finalized_block_fetcher_automation";
2515    // operator uses verifier's lcp syncer
2516    const LCP_SYNCER_CONSUMER_ID: &'static str = "verifier_lcp_syncer";
2517}
2518
2519#[cfg(feature = "automation")]
2520mod states {
2521
2522    use super::*;
2523    use crate::builder::transaction::{
2524        create_txhandlers, ContractContext, ReimburseDbCache, TxHandler, TxHandlerCache,
2525    };
2526    use crate::states::context::DutyResult;
2527    use crate::states::{Duty, Owner, StateManager};
2528    use clementine_primitives::TransactionType;
2529    use std::collections::BTreeMap;
2530
2531    #[tonic::async_trait]
2532    impl<C> Owner for Operator<C>
2533    where
2534        C: CitreaClientT,
2535    {
2536        async fn handle_duty(
2537            &self,
2538            dbtx: DatabaseTransaction<'_>,
2539            duty: Duty,
2540        ) -> Result<DutyResult, BridgeError> {
2541            match duty {
2542                Duty::NewReadyToReimburse {
2543                    round_idx,
2544                    operator_xonly_pk,
2545                    used_kickoffs,
2546                } => {
2547                    tracing::info!("Operator {:?} called new ready to reimburse with round_idx: {:?}, operator_xonly_pk: {:?}, used_kickoffs: {:?}",
2548                    self.signer.xonly_public_key, round_idx, operator_xonly_pk, used_kickoffs);
2549                    Ok(DutyResult::Handled)
2550                }
2551                Duty::WatchtowerChallenge { .. } => Ok(DutyResult::Handled),
2552                Duty::AddNecessaryTxsForKickoff {
2553                    kickoff_data,
2554                    deposit_data,
2555                } => {
2556                    // Why is this needed if these txs are already added when kickoff is created in handle_finalized_payout?
2557                    // If it was created when operator had no automation, and now automation is enabled, we need to ensure the necessary txs are queued.
2558                    // Or if operator lost db and is resyncing.
2559                    tracing::info!("Operator {:?} called add necessary txs for kickoff with kickoff_data: {:?}, deposit_data: {:?}",
2560                    self.signer.xonly_public_key, kickoff_data, deposit_data);
2561                    self.queue_relevant_txs_for_new_kickoff(dbtx, kickoff_data, deposit_data)
2562                        .await?;
2563                    Ok(DutyResult::Handled)
2564                }
2565                Duty::AddRelevantTxsToTxSenderIfChallenged {
2566                    kickoff_data,
2567                    deposit_data,
2568                } => {
2569                    tracing::info!("Operator {:?} called add relevant txs to tx sender with kickoff_data: {:?}, deposit_data: {:?}",
2570                    self.signer.xonly_public_key, kickoff_data, deposit_data);
2571                    self.queue_relevant_txs_for_new_kickoff(dbtx, kickoff_data, deposit_data)
2572                        .await?;
2573                    Ok(DutyResult::Handled)
2574                }
2575                Duty::SendOperatorAsserts {
2576                    kickoff_data,
2577                    deposit_data,
2578                    watchtower_challenges,
2579                    payout_blockhash,
2580                    latest_blockhash,
2581                } => {
2582                    tracing::info!("Operator {:?} called send operator asserts with kickoff_data: {:?}, deposit_data: {:?}, number of watchtower_challenges: {}",
2583                    self.signer.xonly_public_key, kickoff_data, deposit_data, watchtower_challenges.len());
2584                    self.send_asserts(
2585                        dbtx,
2586                        kickoff_data,
2587                        deposit_data,
2588                        watchtower_challenges,
2589                        payout_blockhash,
2590                        latest_blockhash,
2591                    )
2592                    .await?;
2593                    Ok(DutyResult::Handled)
2594                }
2595                Duty::VerifierDisprove { .. } => Ok(DutyResult::Handled),
2596                Duty::SendLatestBlockhash {
2597                    kickoff_data,
2598                    deposit_data,
2599                    latest_blockhash,
2600                } => {
2601                    tracing::info!("Operator {:?} called send latest blockhash with kickoff_id: {:?}, deposit_data: {:?}, latest_blockhash: {:?}", self.signer.xonly_public_key, kickoff_data, deposit_data, latest_blockhash);
2602                    self.send_latest_blockhash(dbtx, kickoff_data, deposit_data, latest_blockhash)
2603                        .await?;
2604                    Ok(DutyResult::Handled)
2605                }
2606                Duty::CheckIfKickoff {
2607                    txid,
2608                    block_height,
2609                    witness,
2610                } => {
2611                    tracing::debug!(
2612                        "Operator {:?} called check if kickoff with txid: {:?}, block_height: {:?}",
2613                        self.signer.xonly_public_key,
2614                        txid,
2615                        block_height,
2616                    );
2617
2618                    let kickoff_data = self
2619                        .db
2620                        .get_deposit_data_with_kickoff_txid(Some(dbtx), txid)
2621                        .await?;
2622                    if let Some((deposit_data, kickoff_data)) = kickoff_data {
2623                        StateManager::<Self>::dispatch_new_kickoff_machine(
2624                            &self.db,
2625                            dbtx,
2626                            kickoff_data,
2627                            block_height,
2628                            deposit_data.clone(),
2629                            witness,
2630                        )
2631                        .await?;
2632
2633                        // resend relevant txs
2634                        self.queue_relevant_txs_for_new_kickoff(dbtx, kickoff_data, deposit_data)
2635                            .await?;
2636                    }
2637
2638                    Ok(DutyResult::Handled)
2639                }
2640                // Operators do not check if kickoffs are malicious
2641                Duty::CheckIfKickoffMalicious { .. } => Ok(DutyResult::Handled),
2642            }
2643        }
2644
2645        async fn create_txhandlers(
2646            &self,
2647            dbtx: DatabaseTransaction<'_>,
2648            tx_type: TransactionType,
2649            contract_context: ContractContext,
2650        ) -> Result<BTreeMap<TransactionType, TxHandler>, BridgeError> {
2651            let mut db_cache =
2652                ReimburseDbCache::from_context(self.db.clone(), &contract_context, Some(dbtx));
2653            let txhandlers = create_txhandlers(
2654                tx_type,
2655                contract_context,
2656                &mut TxHandlerCache::new(),
2657                &mut db_cache,
2658            )
2659            .await?;
2660            Ok(txhandlers)
2661        }
2662
2663        fn is_kickoff_relevant_for_owner(&self, kickoff_data: &KickoffData) -> bool {
2664            kickoff_data.operator_xonly_pk == self.signer.xonly_public_key
2665        }
2666    }
2667}
2668
2669#[cfg(test)]
2670mod tests {
2671    use crate::operator::Operator;
2672    use crate::test::common::citrea::MockCitreaClient;
2673    use crate::test::common::*;
2674    use bitcoin::hashes::Hash;
2675    use bitcoin::{OutPoint, Txid};
2676
2677    #[tokio::test]
2678    #[ignore = "Design changes in progress"]
2679    async fn get_winternitz_public_keys() {
2680        let mut config = create_test_config_with_thread_name().await;
2681        let _regtest = create_regtest_rpc(&mut config).await;
2682
2683        let operator = Operator::<MockCitreaClient>::new(config.clone())
2684            .await
2685            .unwrap();
2686
2687        let deposit_outpoint = OutPoint {
2688            txid: Txid::all_zeros(),
2689            vout: 2,
2690        };
2691
2692        let winternitz_public_key = operator
2693            .generate_assert_winternitz_pubkeys(deposit_outpoint)
2694            .unwrap();
2695        assert_eq!(
2696            winternitz_public_key.len(),
2697            config.protocol_paramset().num_round_txs
2698                * config.protocol_paramset().num_kickoffs_per_round
2699        );
2700    }
2701
2702    #[tokio::test]
2703    async fn operator_get_params() {
2704        let mut config = create_test_config_with_thread_name().await;
2705        let _regtest = create_regtest_rpc(&mut config).await;
2706
2707        let operator = Operator::<MockCitreaClient>::new(config.clone())
2708            .await
2709            .unwrap();
2710        let actual_wpks = operator.generate_kickoff_winternitz_pubkeys().unwrap();
2711
2712        let (mut wpk_rx, _) = operator.get_params().await.unwrap();
2713        let mut idx = 0;
2714        while let Some(wpk) = wpk_rx.recv().await {
2715            assert_eq!(actual_wpks[idx], wpk);
2716            idx += 1;
2717        }
2718        assert_eq!(idx, actual_wpks.len());
2719    }
2720}