clementine_core/rpc/
aggregator.rs

1use super::clementine::{
2    clementine_aggregator_server::ClementineAggregator, verifier_deposit_finalize_params,
3    DepositParams, Empty, VerifierDepositFinalizeParams,
4};
5use super::clementine::{
6    AggregatorWithdrawResponse, Deposit, EntityStatuses, GetEntityStatusesRequest,
7    OptimisticPayoutParams, RawSignedTx, VergenResponse, VerifierPublicKeys,
8};
9use crate::aggregator::{
10    AggregatorServer, CompatibilityCheckScope, OperatorId, ParticipatingOperators,
11    ParticipatingVerifiers, VerifierId,
12};
13use crate::bitvm_client::SECP;
14use crate::builder::sighash::SignatureInfo;
15use crate::builder::transaction::{
16    create_emergency_stop_txhandler, create_move_to_vault_txhandler,
17    create_optimistic_payout_txhandler, Signed, TxHandler,
18};
19use crate::compatibility::ActorWithConfig;
20use crate::config::BridgeConfig;
21use crate::constants::{
22    DEPOSIT_FINALIZATION_TIMEOUT, DEPOSIT_FINALIZE_STREAM_CREATION_TIMEOUT,
23    KEY_DISTRIBUTION_TIMEOUT, NONCE_STREAM_CREATION_TIMEOUT, OPERATOR_SIGS_STREAM_CREATION_TIMEOUT,
24    OPERATOR_SIGS_TIMEOUT, OPTIMISTIC_PAYOUT_TIMEOUT, OVERALL_DEPOSIT_TIMEOUT,
25    PARTIAL_SIG_STREAM_CREATION_TIMEOUT, PIPELINE_COMPLETION_TIMEOUT, SEND_OPERATOR_SIGS_TIMEOUT,
26    SETUP_COMPLETION_TIMEOUT, WITHDRAWAL_TIMEOUT,
27};
28use crate::deposit::{Actors, DepositData, DepositInfo};
29use crate::musig2::AggregateFromPublicKeys;
30use crate::rpc::clementine::{
31    operator_withrawal_response, AggregatorWithdrawalInput, CompatibilityParamsRpc,
32    EntitiesCompatibilityData, OperatorWithrawalResponse, VerifierDepositSignParams,
33};
34use crate::rpc::parser;
35#[cfg(feature = "automation")]
36use crate::tx_sender_queue::TxSenderClientQueueExt;
37use crate::utils::{
38    flatten_join_named_results, get_vergen_response, timed_request, timed_try_join_all,
39    try_join_all_combine_errors, ScriptBufExt,
40};
41use crate::utils::{FeePayingType, TxMetadata};
42use crate::{
43    aggregator::Aggregator,
44    builder::sighash::create_nofn_sighash_stream,
45    musig2::aggregate_nonces,
46    rpc::clementine::{self, DepositSignSession},
47};
48use bitcoin::hashes::Hash;
49use bitcoin::secp256k1::schnorr::Signature;
50use bitcoin::secp256k1::{Message, PublicKey};
51use bitcoin::{TapSighash, TxOut, Txid, XOnlyPublicKey};
52use clementine_errors::BridgeError;
53use clementine_errors::TransactionType;
54use clementine_errors::{ErrorExt, ResultExt};
55use clementine_primitives::UTXO;
56use eyre::{Context, OptionExt};
57use futures::future::join_all;
58use futures::{
59    stream::{BoxStream, TryStreamExt},
60    FutureExt, Stream, StreamExt, TryFutureExt,
61};
62use secp256k1::musig::{AggregatedNonce, PartialSignature, PublicNonce};
63use std::future::Future;
64use tokio::sync::mpsc::{channel, Receiver, Sender};
65use tonic::{async_trait, Request, Response, Status, Streaming};
66struct AggNonceQueueItem {
67    agg_nonce: AggregatedNonce,
68    sighash: TapSighash,
69}
70
71struct FinalSigQueueItem {
72    final_sig: Vec<u8>,
73}
74
75use clementine_errors::AggregatorError;
76
77async fn get_next_pub_nonces(
78    nonce_streams: &mut [impl Stream<Item = Result<PublicNonce, BridgeError>>
79              + Unpin
80              + Send
81              + 'static],
82    verifiers_ids: &[VerifierId],
83) -> Result<Vec<PublicNonce>, BridgeError> {
84    try_join_all_combine_errors(nonce_streams.iter_mut().zip(verifiers_ids).map(
85        |(s, id)| async move {
86            s.next()
87                .await
88                .transpose()
89                .wrap_err(format!("Failed to get nonce from {id}"))? // Return the inner error if it exists
90                .ok_or_else(|| -> eyre::Report {
91                    AggregatorError::InputStreamEndedEarlyUnknownSize {
92                        // Return an early end error if the stream is empty
93                        stream_name: format!("Nonce stream {id}"),
94                    }
95                    .into()
96                })
97        },
98    ))
99    .await
100}
101
102/// For each expected sighash, we collect a batch of public nonces from all verifiers. We aggregate and send aggregated nonce and all public nonces (needed for partial signature verification) to the agg_nonce_sender. Then repeat for the next sighash.
103async fn nonce_aggregator(
104    mut nonce_streams: Vec<
105        impl Stream<Item = Result<PublicNonce, BridgeError>> + Unpin + Send + 'static,
106    >,
107    mut sighash_stream: impl Stream<Item = Result<(TapSighash, SignatureInfo), BridgeError>>
108        + Unpin
109        + Send
110        + 'static,
111    agg_nonce_sender: Sender<(AggNonceQueueItem, Vec<PublicNonce>)>,
112    needed_nofn_sigs: usize,
113    verifiers_ids: Vec<VerifierId>,
114) -> Result<
115    (
116        (AggregatedNonce, Vec<PublicNonce>),
117        (AggregatedNonce, Vec<PublicNonce>),
118    ),
119    BridgeError,
120> {
121    let mut total_sigs = 0;
122
123    tracing::info!("Starting nonce aggregation (expecting {needed_nofn_sigs} nonces)");
124
125    // sanity check
126    if verifiers_ids.len() != nonce_streams.len() {
127        return Err(
128            eyre::eyre!("Number of verifiers ids and nonce streams must be the same").into(),
129        );
130    }
131
132    while let Some(msg) = sighash_stream.next().await {
133        let (sighash, siginfo) = msg.wrap_err("Sighash stream failed")?;
134
135        total_sigs += 1;
136
137        let pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
138            .await
139            .wrap_err_with(|| {
140                format!("Failed to get nonces from verifiers for sighash #{total_sigs} with siginfo: {siginfo:?}")
141            })?;
142
143        tracing::trace!(
144            "Received nonces for signature id {:?} in nonce_aggregator",
145            siginfo.signature_id
146        );
147
148        let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::<Vec<_>>().as_slice())?;
149
150        agg_nonce_sender
151            .send((AggNonceQueueItem { agg_nonce, sighash }, pub_nonces))
152            .await
153            .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
154                stream_name: "nonce_aggregator".to_string(),
155            })?;
156
157        tracing::trace!(
158            "Sent nonces for signature id {:?} in nonce_aggregator",
159            siginfo.signature_id
160        );
161    }
162    tracing::trace!(tmp_debug = 1, "Sent {total_sigs} to agg_nonce stream");
163
164    // Sanity check, should never happen
165    if total_sigs != needed_nofn_sigs {
166        let err_msg = format!(
167            "Expected {needed_nofn_sigs} nofn signatures, got {total_sigs} from sighash stream",
168        );
169        tracing::error!("{err_msg}");
170        return Err(eyre::eyre!(err_msg).into());
171    }
172    // aggregate nonces for the movetx signature
173    let movetx_pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
174        .await
175        .wrap_err("Failed to get movetx public nonces from verifiers")?;
176
177    tracing::trace!("Received nonces for movetx in nonce_aggregator");
178
179    let move_tx_agg_nonce =
180        aggregate_nonces(movetx_pub_nonces.iter().collect::<Vec<_>>().as_slice())
181            .wrap_err("Failed to aggregate movetx nonces")?;
182
183    let emergency_stop_pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
184        .await
185        .wrap_err("Failed to get emergency stop tx public nonces from verifiers")?;
186
187    let emergency_stop_agg_nonce = aggregate_nonces(
188        emergency_stop_pub_nonces
189            .iter()
190            .collect::<Vec<_>>()
191            .as_slice(),
192    )
193    .wrap_err("Failed to aggregate emergency stop tx nonces")?;
194
195    Ok((
196        (move_tx_agg_nonce, movetx_pub_nonces),
197        (emergency_stop_agg_nonce, emergency_stop_pub_nonces),
198    ))
199}
200
201/// Reroutes aggregated nonces and public nonces for each aggregated nonce to the signature aggregator.
202async fn nonce_distributor(
203    mut agg_nonce_receiver: Receiver<(AggNonceQueueItem, Vec<PublicNonce>)>,
204    partial_sig_streams: Vec<(
205        Streaming<clementine::PartialSig>,
206        Sender<clementine::VerifierDepositSignParams>,
207    )>,
208    partial_sig_sender: Sender<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>,
209    needed_nofn_sigs: usize,
210    verifiers_ids: Vec<VerifierId>,
211) -> Result<(), BridgeError> {
212    let mut nonce_count = 0;
213    let mut sig_count = 0;
214    let (mut partial_sig_rx, mut partial_sig_tx): (Vec<_>, Vec<_>) =
215        partial_sig_streams.into_iter().unzip();
216
217    let (queue_tx, mut queue_rx) = channel(crate::constants::DEFAULT_CHANNEL_SIZE);
218
219    // sanity check
220    if verifiers_ids.len() != partial_sig_rx.len() {
221        return Err(eyre::eyre!(
222            "Number of verifiers ids and partial sig streams must be the same"
223        )
224        .into());
225    }
226    let verifiers_ids_clone = verifiers_ids.clone();
227    let handle_1 = tokio::spawn(async move {
228        while let Some((queue_item, pub_nonces)) = agg_nonce_receiver.recv().await {
229            nonce_count += 1;
230
231            tracing::trace!(
232                "Received aggregated nonce {} in nonce_distributor",
233                nonce_count
234            );
235
236            let agg_nonce_wrapped = clementine::VerifierDepositSignParams {
237                params: Some(clementine::verifier_deposit_sign_params::Params::AggNonce(
238                    queue_item.agg_nonce.serialize().to_vec(),
239                )),
240            };
241
242            // Broadcast aggregated nonce to all streams
243            try_join_all_combine_errors(
244                partial_sig_tx
245                    .iter_mut()
246                    .zip(verifiers_ids_clone.iter())
247                    .map(|(tx, id)| {
248                        let agg_nonce_wrapped = agg_nonce_wrapped.clone();
249                        async move {
250                            tx.send(agg_nonce_wrapped)
251                                .await
252                                .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
253                                    stream_name: format!("Partial sig {id}"),
254                                })
255                                .inspect_err(|e| {
256                                    tracing::error!(
257                                        "Failed to send aggregated nonce to {id}: {:?}",
258                                        e
259                                    );
260                                })
261                        }
262                    }),
263            )
264            .await
265            .wrap_err("Failed to send aggregated nonces to verifiers")?;
266
267            queue_tx
268                .send((queue_item, pub_nonces))
269                .await
270                .wrap_err("Other end of channel closed")?;
271
272            tracing::trace!(
273                "Sent aggregated nonce {} to verifiers in nonce_distributor",
274                nonce_count
275            );
276            if nonce_count == needed_nofn_sigs {
277                break;
278            }
279        }
280        if nonce_count != needed_nofn_sigs {
281            let err_msg = format!("Expected {needed_nofn_sigs} aggregated nonces in nonce_distributor, got {nonce_count}",);
282            tracing::error!("{err_msg}");
283            return Err(eyre::eyre!(err_msg).into());
284        }
285
286        tracing::trace!(
287            tmp_debug = 1,
288            "Broadcasted {nonce_count} agg_nonces to verifiers and to the queue"
289        );
290        Ok::<(), BridgeError>(())
291    });
292
293    let handle_2 = tokio::spawn(async move {
294        while let Some((queue_item, pub_nonces)) = queue_rx.recv().await {
295            let pub_nonces_ref = pub_nonces.as_slice();
296            if pub_nonces_ref.len() != partial_sig_rx.len() {
297                return Err(eyre::eyre!(
298                    "Number of public nonces {} and partial sig streams {} must be the same",
299                    pub_nonces_ref.len(),
300                    partial_sig_rx.len()
301                )
302                .into());
303            }
304            let partial_sigs = try_join_all_combine_errors(partial_sig_rx.iter_mut().zip(pub_nonces_ref.iter()).zip(verifiers_ids.iter()).map(
305                |((stream, pub_nonce), id)| async move {
306                    let partial_sig = stream
307                        .message()
308                        .await
309                        .wrap_err_with(|| AggregatorError::RequestFailed {
310                            request_name: format!("Partial sig {sig_count} from {id}"),
311                        })
312                        .inspect_err(|e| {
313                            tracing::error!(
314                                "Failed to receive partial signature {sig_count} from {id}, an error was sent: {:?}",
315                                e
316                            );
317                        })?
318                        .ok_or_eyre(AggregatorError::InputStreamEndedEarlyUnknownSize {
319                            stream_name: format!("Partial sig {sig_count} from {id} closed"),
320                        }).inspect_err(|e| {
321                            tracing::error!(
322                                "Failed to receive partial signature {sig_count} from {id}, the stream was closed: {:?}",
323                                e
324                            );
325                        })?;
326                    let partial_sig = PartialSignature::from_byte_array(
327                        &partial_sig
328                            .partial_sig
329                            .as_slice()
330                            .try_into()
331                            .wrap_err("PartialSignature must be 32 bytes")?,
332                    )
333                    .wrap_err(format!("Failed to parse partial signature {sig_count} from {id}"))?;
334
335                    Ok::<_, BridgeError>((partial_sig, *pub_nonce))
336                },
337            ))
338            .await?;
339
340            sig_count += 1;
341
342            tracing::trace!(
343                "Received partial signature {} from verifiers in nonce_distributor",
344                sig_count
345            );
346
347            partial_sig_sender
348                .send((partial_sigs, queue_item))
349                .await
350                .map_err(|_| {
351                    eyre::eyre!(AggregatorError::OutputStreamEndedEarly {
352                        stream_name: "partial_sig_sender".into(),
353                    })
354                })?;
355
356            tracing::trace!(
357                "Sent partial signature {} to signature_aggregator in nonce_distributor",
358                sig_count
359            );
360        }
361
362        if sig_count != needed_nofn_sigs {
363            let err_msg = format!(
364                "Expected {needed_nofn_sigs} partial signatures in nonce_distributor, got {sig_count}",
365            );
366            tracing::error!("{err_msg}");
367            return Err(eyre::eyre!(err_msg).into());
368        }
369        tracing::trace!(
370            tmp_debug = 1,
371            "Sent {sig_count} partial sig bundles to partial_sigs stream"
372        );
373
374        tracing::trace!("Finished tasks in nonce_distributor handle 2");
375        Ok::<(), BridgeError>(())
376    });
377
378    let (result_1, result_2) = tokio::join!(handle_1, handle_2);
379
380    let mut task_errors = Vec::new();
381
382    match result_1 {
383        Ok(inner_result) => {
384            if let Err(e) = inner_result {
385                task_errors.push(format!(
386                    "Task returned error while distributing aggnonces: {e:#?}"
387                ));
388            }
389        }
390        Err(e) => {
391            task_errors.push(format!(
392                "Task panicked while distributing aggnonces: {e:#?}"
393            ));
394        }
395    }
396
397    match result_2 {
398        Ok(inner_result) => {
399            if let Err(e) = inner_result {
400                task_errors.push(format!(
401                    "Task returned error while receiving partial sigs: {e:#?}"
402                ));
403            }
404        }
405        Err(e) => {
406            task_errors.push(format!(
407                "Task panicked while receiving partial sigs: {e:#?}"
408            ));
409        }
410    }
411
412    if !task_errors.is_empty() {
413        return Err(eyre::eyre!(format!(
414            "nonce_distributor failed with errors: {:#?}",
415            task_errors
416        ))
417        .into());
418    }
419
420    tracing::debug!("Finished tasks in nonce_distributor");
421
422    Ok(())
423}
424
425/// Collects partial signatures and corresponding public nonces from given stream and aggregates them.
426/// Each partial signature will also be verified if PARTIAL_SIG_VERIFICATION is set to true.
427async fn signature_aggregator(
428    mut partial_sig_receiver: Receiver<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>,
429    verifiers_public_keys: Vec<PublicKey>,
430    final_sig_sender: Sender<FinalSigQueueItem>,
431    needed_nofn_sigs: usize,
432) -> Result<(), BridgeError> {
433    let mut sig_count = 0;
434    while let Some((partial_sigs, queue_item)) = partial_sig_receiver.recv().await {
435        sig_count += 1;
436        tracing::trace!(
437            "Received partial signatures {} in signature_aggregator",
438            sig_count
439        );
440
441        let final_sig = crate::musig2::aggregate_partial_signatures(
442            verifiers_public_keys.clone(),
443            None,
444            queue_item.agg_nonce,
445            &partial_sigs,
446            Message::from_digest(queue_item.sighash.to_byte_array()),
447        )?;
448
449        final_sig_sender
450            .send(FinalSigQueueItem {
451                final_sig: final_sig.serialize().to_vec(),
452            })
453            .await
454            .wrap_err_with(|| {
455                eyre::eyre!(AggregatorError::OutputStreamEndedEarly {
456                    stream_name: "final_sig_sender".into(),
457                })
458            })?;
459        tracing::trace!(
460            "Sent aggregated signature {} to signature_distributor in signature_aggregator",
461            sig_count
462        );
463
464        if sig_count == needed_nofn_sigs {
465            break;
466        }
467    }
468
469    if sig_count != needed_nofn_sigs {
470        let err_msg = format!(
471            "Expected {needed_nofn_sigs} aggregated signatures in signature_aggregator, got {sig_count}",
472        );
473        tracing::error!("{err_msg}");
474        return Err(eyre::eyre!(err_msg).into());
475    }
476
477    tracing::trace!(
478        tmp_debug = 1,
479        "Sent {sig_count} aggregated signatures to final_sig stream"
480    );
481
482    Ok(())
483}
484
485/// Reroutes aggregated signatures to the caller.
486/// Also sends 2 aggregated nonces to the verifiers.
487async fn signature_distributor(
488    mut final_sig_receiver: Receiver<FinalSigQueueItem>,
489    deposit_finalize_sender: Vec<Sender<VerifierDepositFinalizeParams>>,
490    agg_nonce: impl Future<
491        Output = Result<
492            (
493                (AggregatedNonce, Vec<PublicNonce>),
494                (AggregatedNonce, Vec<PublicNonce>),
495            ),
496            Status,
497        >,
498    >,
499    needed_nofn_sigs: usize,
500    verifiers_ids: Vec<VerifierId>,
501) -> Result<(), BridgeError> {
502    use verifier_deposit_finalize_params::Params;
503    let mut sig_count = 0;
504    while let Some(queue_item) = final_sig_receiver.recv().await {
505        sig_count += 1;
506        tracing::trace!("Received signature {} in signature_distributor", sig_count);
507        let final_params = VerifierDepositFinalizeParams {
508            params: Some(Params::SchnorrSig(queue_item.final_sig)),
509        };
510
511        try_join_all_combine_errors(
512            deposit_finalize_sender
513                .iter()
514                .zip(verifiers_ids.iter())
515                .map(|(tx, id)| {
516                    let final_params = final_params.clone();
517                    async move {
518                        tx.send(final_params).await.wrap_err_with(|| {
519                            AggregatorError::OutputStreamEndedEarly {
520                                stream_name: format!("Deposit finalize sender for {id}"),
521                            }
522                        })
523                    }
524                }),
525        )
526        .await
527        .wrap_err(format!(
528            "Failed to send final signature {sig_count} to verifiers"
529        ))?;
530
531        tracing::trace!(
532            "Sent signature {} to verifiers in signature_distributor",
533            sig_count
534        );
535
536        if sig_count == needed_nofn_sigs {
537            break;
538        }
539    }
540
541    if sig_count != needed_nofn_sigs {
542        let err_msg = format!(
543            "Expected {needed_nofn_sigs} signatures in signature_distributor, got {sig_count}",
544        );
545        tracing::error!("{err_msg}");
546        return Err(eyre::eyre!(err_msg).into());
547    }
548
549    tracing::trace!(
550        tmp_debug = 1,
551        "Sent {sig_count} signatures to verifiers in deposit_finalize"
552    );
553
554    let (movetx_agg_nonce, emergency_stop_agg_nonce) = agg_nonce
555        .await
556        .wrap_err("Failed to get aggregated nonce for movetx and emergency stop")?;
557
558    tracing::info!("Got aggregated nonce for movetx and emergency stop in signature distributor");
559
560    // Send the movetx agg nonce to the verifiers.
561    for tx in &deposit_finalize_sender {
562        tx.send(VerifierDepositFinalizeParams {
563            params: Some(Params::MoveTxAggNonce(
564                movetx_agg_nonce.0.serialize().to_vec(),
565            )),
566        })
567        .await
568        .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
569            stream_name: "Deposit finalize sender (for movetx agg nonce)".to_string(),
570        })?;
571    }
572    tracing::info!("Sent movetx aggregated nonce to verifiers in signature distributor");
573
574    // send emergency stop agg nonce to verifiers
575    for tx in &deposit_finalize_sender {
576        tx.send(VerifierDepositFinalizeParams {
577            params: Some(Params::EmergencyStopAggNonce(
578                emergency_stop_agg_nonce.0.serialize().to_vec(),
579            )),
580        })
581        .await
582        .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
583            stream_name: "Deposit finalize sender (for emergency stop agg nonce)".to_string(),
584        })?;
585    }
586    tracing::info!("Sent emergency stop aggregated nonce to verifiers in signature distributor");
587
588    Ok(())
589}
590
591/// Creates a stream of nonces from verifiers.
592/// This will automatically get the first response from the verifiers.
593///
594/// # Returns
595///
596/// - Vec<[`clementine::NonceGenFirstResponse`]>: First response from each verifier
597/// - Vec<BoxStream<Result<[`PublicNonce`], BridgeError>>>: Stream of nonces from each verifier
598async fn create_nonce_streams(
599    verifiers: ParticipatingVerifiers,
600    num_nonces: u32,
601    #[cfg(test)] config: &crate::config::BridgeConfig,
602) -> Result<
603    (
604        Vec<clementine::NonceGenFirstResponse>,
605        Vec<BoxStream<'static, Result<PublicNonce, BridgeError>>>,
606    ),
607    BridgeError,
608> {
609    let mut nonce_streams = timed_try_join_all(
610        NONCE_STREAM_CREATION_TIMEOUT,
611        "Nonce stream creation",
612        Some(verifiers.ids()),
613        verifiers
614            .clients()
615            .into_iter()
616            .enumerate()
617            .map(|(idx, client)| {
618                let mut client = client.clone();
619                #[cfg(test)]
620                let config = config.clone();
621
622                async move {
623                    #[cfg(test)]
624                    config
625                        .test_params
626                        .timeout_params
627                        .hook_timeout_nonce_stream_creation_verifier(idx)
628                        .await;
629                    let response_stream = client
630                        .nonce_gen(tonic::Request::new(clementine::NonceGenRequest {
631                            num_nonces,
632                        }))
633                        .await
634                        .wrap_err_with(|| AggregatorError::RequestFailed {
635                            request_name: format!("Nonce gen stream for verifier {idx}"),
636                        })?;
637
638                    Ok::<_, BridgeError>(response_stream.into_inner())
639                }
640            }),
641    )
642    .await?;
643
644    // Get the first responses from verifiers.
645    let first_responses: Vec<clementine::NonceGenFirstResponse> =
646        try_join_all_combine_errors(nonce_streams.iter_mut().zip(verifiers.ids()).map(
647            |(stream, id)| async move {
648                parser::verifier::parse_nonce_gen_first_response(stream)
649                    .await
650                    .wrap_err_with(|| format!("Failed to get initial response from {id}"))
651            },
652        ))
653        .await
654        .wrap_err("Failed to get nonce gen's initial responses from verifiers")?;
655
656    let transformed_streams = nonce_streams
657        .into_iter()
658        .zip(verifiers.ids())
659        .map(|(stream, id)| {
660            stream
661                .map(move |result| {
662                    Aggregator::extract_pub_nonce(
663                        result
664                            .wrap_err_with(|| AggregatorError::InputStreamEndedEarlyUnknownSize {
665                                stream_name: format!("Nonce gen stream for {id}"),
666                            })?
667                            .response,
668                    )
669                })
670                .boxed()
671        })
672        .collect::<Vec<_>>();
673
674    Ok((first_responses, transformed_streams))
675}
676
677/// Use items collected from the broadcast receiver for an async function call.
678///
679/// Handles the boilerplate of managing a receiver of a broadcast channel.
680/// If receiver is lagged at any time (data is lost) an error is returned.
681async fn collect_and_call<R, T, F, Fut>(
682    rx: &mut tokio::sync::broadcast::Receiver<Vec<T>>,
683    mut f: F,
684) -> Result<R, Status>
685where
686    R: Default,
687    T: Clone,
688    F: FnMut(Vec<T>) -> Fut,
689    Fut: Future<Output = Result<R, Status>>,
690{
691    loop {
692        match rx.recv().await {
693            Ok(params) => {
694                f(params).await?;
695            }
696            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
697                break Err(Status::internal(format!(
698                    "lost {n} items due to lagging receiver"
699                )));
700            }
701            Err(tokio::sync::broadcast::error::RecvError::Closed) => break Ok(R::default()),
702        }
703    }
704}
705
706impl Aggregator {
707    // Extracts pub_nonce from given stream.
708    fn extract_pub_nonce(
709        response: Option<clementine::nonce_gen_response::Response>,
710    ) -> Result<PublicNonce, BridgeError> {
711        match response.ok_or_eyre("NonceGen response is empty")? {
712            clementine::nonce_gen_response::Response::PubNonce(pub_nonce) => {
713                Ok(PublicNonce::from_byte_array(
714                    &pub_nonce
715                        .as_slice()
716                        .try_into()
717                        .wrap_err("PubNonce must be 66 bytes")?,
718                )
719                .wrap_err("Failed to parse pub nonce")?)
720            }
721            _ => Err(eyre::eyre!("Expected PubNonce in response").into()),
722        }
723    }
724
725    /// For a specific deposit, collects needed signatures from all operators into a [`Vec<Vec<Signature>>`].
726    async fn collect_operator_sigs(
727        operator_clients: ParticipatingOperators,
728        config: BridgeConfig,
729        mut deposit_sign_session: DepositSignSession,
730    ) -> Result<Vec<Vec<Signature>>, BridgeError> {
731        deposit_sign_session.nonce_gen_first_responses = Vec::new(); // not needed for operators
732        let mut operator_sigs_streams =
733            // create deposit sign streams with each operator
734            timed_try_join_all(
735                OPERATOR_SIGS_STREAM_CREATION_TIMEOUT,
736                "Operator signature stream creation",
737                Some(operator_clients.ids()),
738                operator_clients.clients().into_iter().enumerate().map(|(idx, mut operator_client)| {
739                let sign_session = deposit_sign_session.clone();
740                #[cfg(test)]
741                let config = config.clone();
742                async move {
743                    #[cfg(test)]
744                    config
745                        .test_params
746                        .timeout_params
747                        .hook_timeout_operator_sig_collection_operator(idx)
748                        .await;
749                    let stream = operator_client
750                        .deposit_sign(tonic::Request::new(sign_session))
751                        .await.wrap_err_with(|| AggregatorError::RequestFailed {
752                            request_name: format!("Deposit sign stream for operator {idx}"),
753                        })?;
754                    Ok::<_, BridgeError>(stream.into_inner())
755                }
756            }))
757                .await?;
758
759        let deposit_data: DepositData = deposit_sign_session
760            .deposit_params
761            .clone()
762            .ok_or_else(|| eyre::eyre!("No deposit params found in deposit sign session"))?
763            .try_into()
764            .wrap_err("Failed to convert deposit params to deposit data")?;
765
766        // calculate number of signatures needed from each operator
767        let needed_sigs = config.get_num_required_operator_sigs(&deposit_data);
768
769        // get signatures from each operator's signature streams
770        let operator_sigs =
771            try_join_all_combine_errors(operator_sigs_streams.iter_mut().enumerate().map(
772                |(idx, stream)| async move {
773                    let mut sigs: Vec<Signature> = Vec::with_capacity(needed_sigs);
774                    while let Some(sig) =
775                        stream
776                            .message()
777                            .await
778                            .wrap_err_with(|| AggregatorError::RequestFailed {
779                                request_name: format!("Deposit sign stream for operator {idx}"),
780                            })?
781                    {
782                        sigs.push(Signature::from_slice(&sig.schnorr_sig).wrap_err_with(|| {
783                            format!("Failed to parse Schnorr signature from operator {idx}")
784                        })?);
785                        if sigs.len() == needed_sigs {
786                            break;
787                        }
788                    }
789                    Ok::<_, BridgeError>(sigs)
790                },
791            ))
792            .await
793            .wrap_err("Failed to get operator signatures from operators")?;
794
795        // check if all signatures are received
796        for (idx, sigs) in operator_sigs.iter().enumerate() {
797            if sigs.len() != needed_sigs {
798                return Err(eyre::eyre!(
799                    "Not all operator sigs received from operator {}.\n Expected: {}, got: {}",
800                    idx,
801                    needed_sigs,
802                    sigs.len()
803                )
804                .into());
805            }
806        }
807        Ok(operator_sigs)
808    }
809
810    async fn create_movetx(
811        &self,
812        partial_sigs: Vec<Vec<u8>>,
813        movetx_agg_and_pub_nonces: (AggregatedNonce, Vec<PublicNonce>),
814        deposit_params: DepositParams,
815    ) -> Result<TxHandler<Signed>, Status> {
816        let mut deposit_data: DepositData = deposit_params.try_into()?;
817        let musig_partial_sigs = parser::verifier::parse_partial_sigs(partial_sigs)?;
818
819        // create move tx and calculate sighash
820        let mut move_txhandler =
821            create_move_to_vault_txhandler(&mut deposit_data, self.config.protocol_paramset())?;
822
823        let sighash = move_txhandler.calculate_script_spend_sighash_indexed(
824            0,
825            0,
826            bitcoin::TapSighashType::Default,
827        )?;
828
829        let musig_sigs_and_nonces = musig_partial_sigs
830            .into_iter()
831            .zip(movetx_agg_and_pub_nonces.1)
832            .collect::<Vec<_>>();
833
834        // aggregate partial signatures
835        let verifiers_public_keys = deposit_data.get_verifiers();
836        let final_sig = crate::musig2::aggregate_partial_signatures(
837            verifiers_public_keys,
838            None,
839            movetx_agg_and_pub_nonces.0,
840            &musig_sigs_and_nonces,
841            Message::from_digest(sighash.to_byte_array()),
842        )?;
843
844        // Put the signature in the tx
845        move_txhandler.set_p2tr_script_spend_witness(&[final_sig.as_ref()], 0, 0)?;
846
847        Ok(move_txhandler.promote()?)
848    }
849
850    async fn verify_and_save_emergency_stop_sigs(
851        &self,
852        emergency_stop_sigs: Vec<Vec<u8>>,
853        emergency_stop_agg_and_pub_nonces: (AggregatedNonce, Vec<PublicNonce>),
854        deposit_params: DepositParams,
855    ) -> Result<(), BridgeError> {
856        let mut deposit_data: DepositData = deposit_params
857            .try_into()
858            .wrap_err("Failed to convert deposit params to deposit data")?;
859        let musig_partial_sigs = parser::verifier::parse_partial_sigs(emergency_stop_sigs)
860            .wrap_err("Failed to parse emergency stop signatures")?;
861
862        // create move tx and calculate sighash
863        let move_txhandler =
864            create_move_to_vault_txhandler(&mut deposit_data, self.config.protocol_paramset())?;
865
866        let mut emergency_stop_txhandler = create_emergency_stop_txhandler(
867            &mut deposit_data,
868            &move_txhandler,
869            self.config.protocol_paramset(),
870        )?;
871
872        let sighash = emergency_stop_txhandler.calculate_script_spend_sighash_indexed(
873            0,
874            0,
875            bitcoin::TapSighashType::SinglePlusAnyoneCanPay,
876        )?;
877
878        let verifiers_public_keys = deposit_data.get_verifiers();
879
880        let musig_sigs_and_nonces = musig_partial_sigs
881            .into_iter()
882            .zip(emergency_stop_agg_and_pub_nonces.1)
883            .collect::<Vec<_>>();
884
885        let final_sig = crate::musig2::aggregate_partial_signatures(
886            verifiers_public_keys,
887            None,
888            emergency_stop_agg_and_pub_nonces.0,
889            &musig_sigs_and_nonces,
890            Message::from_digest(sighash.to_byte_array()),
891        )
892        .wrap_err("Failed to aggregate emergency stop signatures")?;
893
894        let final_sig = bitcoin::taproot::Signature {
895            signature: final_sig,
896            sighash_type: bitcoin::TapSighashType::SinglePlusAnyoneCanPay,
897        };
898
899        // insert the signature into the tx
900        emergency_stop_txhandler.set_p2tr_script_spend_witness(&[final_sig.serialize()], 0, 0)?;
901
902        let emergency_stop_tx = emergency_stop_txhandler.get_cached_tx();
903        let move_to_vault_txid = move_txhandler.get_txid();
904
905        tracing::debug!("Move to vault tx id: {}", move_to_vault_txid.to_string());
906
907        let emergency_stop_pubkey = self
908            .config
909            .emergency_stop_encryption_public_key
910            .ok_or_else(|| eyre::eyre!("Emergency stop encryption public key is not set"))?;
911        let encrypted_emergency_stop_tx = crate::encryption::encrypt_bytes(
912            emergency_stop_pubkey,
913            &bitcoin::consensus::serialize(&emergency_stop_tx),
914        )?;
915
916        self.db
917            .insert_signed_emergency_stop_tx_if_not_exists(
918                None,
919                move_to_vault_txid,
920                &encrypted_emergency_stop_tx,
921            )
922            .await?;
923
924        Ok(())
925    }
926
927    #[cfg(feature = "automation")]
928    pub async fn send_emergency_stop_tx(
929        &self,
930        tx: bitcoin::Transaction,
931    ) -> Result<bitcoin::Transaction, Status> {
932        // Add fee bumper.
933        let mut dbtx = self.db.begin_transaction().await?;
934        self.tx_sender
935            .insert_try_to_send(
936                &mut dbtx,
937                Some(TxMetadata {
938                    deposit_outpoint: None,
939                    operator_xonly_pk: None,
940                    round_idx: None,
941                    kickoff_idx: None,
942                    tx_type: TransactionType::EmergencyStop,
943                }),
944                &tx,
945                FeePayingType::RBF,
946                None,
947                &[],
948                &[],
949                &[],
950                &[],
951            )
952            .await?;
953        dbtx.commit()
954            .await
955            .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
956
957        Ok(tx)
958    }
959}
960
961#[async_trait]
962impl ClementineAggregator for AggregatorServer {
963    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
964    async fn get_compatibility_params(
965        &self,
966        _request: Request<Empty>,
967    ) -> Result<Response<CompatibilityParamsRpc>, Status> {
968        let params = self.aggregator.get_compatibility_params()?;
969        Ok(Response::new(params.try_into().map_to_status()?))
970    }
971
972    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
973    async fn get_compatibility_data_from_entities(
974        &self,
975        _request: Request<Empty>,
976    ) -> Result<Response<EntitiesCompatibilityData>, Status> {
977        let data = self
978            .aggregator
979            .get_compatibility_data_from_entities()
980            .await?;
981        Ok(Response::new(EntitiesCompatibilityData {
982            entities_compatibility_data: data,
983        }))
984    }
985
986    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
987    async fn vergen(&self, _request: Request<Empty>) -> Result<Response<VergenResponse>, Status> {
988        tracing::info!("Vergen rpc called");
989        Ok(Response::new(get_vergen_response()))
990    }
991    #[tracing::instrument(
992        skip_all,
993        fields(restart_tasks = %request.get_ref().restart_tasks),
994        err(level = tracing::Level::ERROR)
995    )]
996    async fn get_entity_statuses(
997        &self,
998        request: Request<GetEntityStatusesRequest>,
999    ) -> Result<Response<EntityStatuses>, Status> {
1000        tracing::debug!("Get entity statuses rpc called");
1001        let request = request.into_inner();
1002        let restart_tasks = request.restart_tasks;
1003
1004        Ok(Response::new(EntityStatuses {
1005            entity_statuses: self.aggregator.get_entity_statuses(restart_tasks).await?,
1006        }))
1007    }
1008
1009    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
1010    async fn optimistic_payout(
1011        &self,
1012        request: tonic::Request<super::OptimisticWithdrawParams>,
1013    ) -> std::result::Result<tonic::Response<super::RawSignedTx>, tonic::Status> {
1014        tracing::info!("Optimistic payout rpc called");
1015        let opt_withdraw_params = request.into_inner();
1016
1017        let withdraw_params =
1018            opt_withdraw_params
1019                .withdrawal
1020                .clone()
1021                .ok_or(Status::invalid_argument(
1022                    "Withdrawal params not found for optimistic payout",
1023                ))?;
1024        let (deposit_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
1025            parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
1026        tracing::info!("Parsed optimistic payout rpc params, deposit id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}, verification signature: {:?}", deposit_id, input_signature, input_outpoint, output_script_pubkey, output_amount, opt_withdraw_params.verification_signature);
1027
1028        // check compatibility with verifiers only
1029        self.check_compatibility_with_actors(CompatibilityCheckScope::VerifiersOnly)
1030            .await?;
1031
1032        // if the withdrawal utxo is spent, no reason to sign optimistic payout
1033        if self
1034            .rpc
1035            .is_utxo_spent(&input_outpoint)
1036            .await
1037            .map_to_status()?
1038        {
1039            return Err(Status::invalid_argument(format!(
1040                "Withdrawal utxo is already spent: {input_outpoint:?}",
1041            )));
1042        }
1043
1044        // check for some standard script pubkeys
1045        if !(output_script_pubkey.is_p2tr()
1046            || output_script_pubkey.is_p2pkh()
1047            || output_script_pubkey.is_p2sh()
1048            || output_script_pubkey.is_p2wpkh()
1049            || output_script_pubkey.is_p2wsh())
1050        {
1051            return Err(Status::invalid_argument(format!(
1052                "Output script pubkey is not a valid script pubkey: {output_script_pubkey}, must be p2tr, p2pkh, p2sh, p2wpkh, or p2wsh"
1053            )));
1054        }
1055
1056        // get which deposit the withdrawal belongs to
1057        let withdrawal = self
1058            .db
1059            .get_move_to_vault_txid_from_citrea_deposit(None, deposit_id)
1060            .await?;
1061        if let Some(move_txid) = withdrawal {
1062            // check if withdrawal utxo is correct
1063            let withdrawal_utxo = self
1064                .db
1065                .get_withdrawal_utxo_from_citrea_withdrawal(None, deposit_id)
1066                .await?;
1067            if withdrawal_utxo != input_outpoint {
1068                return Err(Status::invalid_argument(format!(
1069                    "Withdrawal utxo is not correct: {withdrawal_utxo:?} != {input_outpoint:?}",
1070                )));
1071            }
1072
1073            // Prepare input and output of the payout transaction.
1074            let withdrawal_prevout = self
1075                .rpc
1076                .get_txout_from_outpoint(&input_outpoint)
1077                .await
1078                .map_to_status()?;
1079
1080            let user_xonly_pk = withdrawal_prevout
1081                .script_pubkey
1082                .try_get_taproot_pk()
1083                .map_err(|_| {
1084                    Status::invalid_argument(format!(
1085                        "Withdrawal prevout script_pubkey is not a Taproot output: {:?}",
1086                        withdrawal_prevout.script_pubkey
1087                    ))
1088                })?;
1089
1090            let withdrawal_utxo = UTXO {
1091                outpoint: input_outpoint,
1092                txout: withdrawal_prevout,
1093            };
1094
1095            let output_txout = TxOut {
1096                value: output_amount,
1097                script_pubkey: output_script_pubkey,
1098            };
1099
1100            let deposit_data = self
1101                .db
1102                .get_deposit_data_with_move_tx(None, move_txid)
1103                .await?;
1104
1105            let mut deposit_data = deposit_data
1106                .ok_or(eyre::eyre!(
1107                    "Deposit data not found for move txid {}",
1108                    move_txid
1109                ))
1110                .map_err(BridgeError::from)?;
1111
1112            let mut opt_payout_txhandler = create_optimistic_payout_txhandler(
1113                &mut deposit_data,
1114                withdrawal_utxo,
1115                output_txout,
1116                input_signature,
1117                self.config.protocol_paramset(),
1118            )?;
1119
1120            let sighash = opt_payout_txhandler
1121                .calculate_pubkey_spend_sighash(0, input_signature.sighash_type)?;
1122
1123            let message = Message::from_digest(sighash.to_byte_array());
1124
1125            SECP.verify_schnorr(&input_signature.signature, &message, &user_xonly_pk)
1126                .map_err(|_| Status::internal("Invalid signature for optimistic payout tx. Ensure the signature uses SinglePlusAnyoneCanPay sighash type."))?;
1127
1128            // get which verifiers participated in the deposit to collect the optimistic payout tx signature
1129            let participating_verifiers = self.get_participating_verifiers(&deposit_data).await?;
1130            let verifiers_ids = participating_verifiers.ids();
1131            let (first_responses, mut nonce_streams) = {
1132                create_nonce_streams(
1133                    participating_verifiers.clone(),
1134                    1,
1135                    #[cfg(test)]
1136                    &self.config,
1137                )
1138                .await?
1139            };
1140            // collect nonces
1141            let pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
1142                .await
1143                .wrap_err("Failed to aggregate nonces for optimistic payout")
1144                .map_to_status()?;
1145            let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::<Vec<_>>().as_slice())?;
1146
1147            let agg_nonce_bytes = agg_nonce.serialize().to_vec();
1148            // send the agg nonce to the verifiers to sign the optimistic payout tx
1149            let opt_payout_sign_futures = participating_verifiers
1150                .clients()
1151                .iter()
1152                .zip(first_responses)
1153                .map(|(client, first_response)| {
1154                    let mut client = client.clone();
1155                    let opt_withdraw_params = opt_withdraw_params.clone();
1156                    {
1157                        let agg_nonce_serialized = agg_nonce_bytes.clone();
1158                        async move {
1159                            let mut request = Request::new(OptimisticPayoutParams {
1160                                opt_withdrawal: Some(opt_withdraw_params),
1161                                agg_nonce: agg_nonce_serialized,
1162                                nonce_gen: Some(first_response),
1163                            });
1164                            request.set_timeout(OPTIMISTIC_PAYOUT_TIMEOUT);
1165                            client.optimistic_payout_sign(request).await
1166                        }
1167                    }
1168                })
1169                .collect::<Vec<_>>();
1170
1171            // get signatures and check for any errors
1172            let opt_payout_resps = join_all(opt_payout_sign_futures).await;
1173            let mut payout_sigs = Vec::new();
1174            let mut errors = Vec::new();
1175            for (resp, verifier_id) in opt_payout_resps
1176                .into_iter()
1177                .zip(participating_verifiers.ids())
1178            {
1179                match resp {
1180                    Ok(res) => {
1181                        payout_sigs.push(res.into_inner());
1182                    }
1183                    Err(e) => {
1184                        errors.push(format!("{verifier_id} optimistic payout sign failed: {e}"));
1185                    }
1186                }
1187            }
1188            if !errors.is_empty() {
1189                return Err(eyre::eyre!("{errors:?}").into_status());
1190            }
1191
1192            // calculate final sig
1193            // txin at index 1 is deposited utxo in movetx
1194            let sighash = opt_payout_txhandler.calculate_script_spend_sighash_indexed(
1195                1,
1196                0,
1197                bitcoin::TapSighashType::Default,
1198            )?;
1199
1200            let musig_partial_sigs = payout_sigs
1201                .into_iter()
1202                .map(|sig| {
1203                    PartialSignature::from_byte_array(
1204                        &sig.partial_sig
1205                            .try_into()
1206                            .map_err(|_| secp256k1::musig::ParseError::MalformedArg)?,
1207                    )
1208                })
1209                .collect::<Result<Vec<_>, _>>()
1210                .map_err(|e| Status::internal(format!("Failed to parse partial sig: {e:?}")))?;
1211
1212            let musig_sigs_and_nonces = musig_partial_sigs
1213                .into_iter()
1214                .zip(pub_nonces)
1215                .collect::<Vec<_>>();
1216
1217            let final_sig = bitcoin::taproot::Signature {
1218                signature: crate::musig2::aggregate_partial_signatures(
1219                    deposit_data.get_verifiers(),
1220                    None,
1221                    agg_nonce,
1222                    &musig_sigs_and_nonces,
1223                    Message::from_digest(sighash.to_byte_array()),
1224                )?,
1225                sighash_type: bitcoin::TapSighashType::Default,
1226            };
1227
1228            // set witness and send tx
1229            opt_payout_txhandler.set_p2tr_script_spend_witness(&[final_sig.serialize()], 1, 0)?;
1230            let opt_payout_txhandler = opt_payout_txhandler.promote()?;
1231            let opt_payout_tx = opt_payout_txhandler.get_cached_tx();
1232            tracing::info!(
1233                "Optimistic payout transaction created successfully for deposit id: {:?}",
1234                deposit_id
1235            );
1236
1237            #[cfg(feature = "automation")]
1238            {
1239                tracing::info!("Sending optimistic payout tx via tx_sender");
1240
1241                let mut dbtx = self.db.begin_transaction().await?;
1242                self.tx_sender
1243                    .add_tx_to_queue(
1244                        &mut dbtx,
1245                        TransactionType::OptimisticPayout,
1246                        opt_payout_tx,
1247                        &[],
1248                        None,
1249                        self.config.protocol_paramset(),
1250                        None,
1251                    )
1252                    .await
1253                    .map_to_status()?;
1254                dbtx.commit().await.map_err(|e| {
1255                    Status::internal(format!(
1256                        "Failed to commit db transaction to send optimistic payout tx: {e}",
1257                    ))
1258                })?;
1259            }
1260
1261            Ok(Response::new(RawSignedTx::from(opt_payout_tx)))
1262        } else {
1263            Err(Status::not_found(format!(
1264                "Withdrawal with index {deposit_id} not found."
1265            )))
1266        }
1267    }
1268
1269    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
1270    async fn internal_send_tx(
1271        &self,
1272        request: Request<clementine::SendTxRequest>,
1273    ) -> Result<Response<Empty>, Status> {
1274        #[cfg(not(feature = "automation"))]
1275        {
1276            Err(Status::unimplemented("Automation is not enabled"))
1277        }
1278        #[cfg(feature = "automation")]
1279        {
1280            let send_tx_req = request.into_inner();
1281            let fee_type = send_tx_req.fee_type();
1282            let signed_tx: bitcoin::Transaction = send_tx_req
1283                .raw_tx
1284                .ok_or(Status::invalid_argument("Missing raw_tx"))?
1285                .try_into()?;
1286            tracing::warn!(
1287                "Internal send tx rpc called with feetype: {:?}, tx hex: {}",
1288                fee_type,
1289                bitcoin::consensus::encode::serialize_hex(&signed_tx)
1290            );
1291
1292            let mut dbtx = self.db.begin_transaction().await?;
1293            self.tx_sender
1294                .insert_try_to_send(
1295                    &mut dbtx,
1296                    None,
1297                    &signed_tx,
1298                    fee_type.try_into()?,
1299                    None,
1300                    &[],
1301                    &[],
1302                    &[],
1303                    &[],
1304                )
1305                .await
1306                .map_to_status()?;
1307            dbtx.commit()
1308                .await
1309                .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
1310            Ok(Response::new(Empty {}))
1311        }
1312    }
1313
1314    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
1315    async fn setup(
1316        &self,
1317        _request: Request<Empty>,
1318    ) -> Result<Response<VerifierPublicKeys>, Status> {
1319        tracing::info!("Setup rpc called");
1320        self.check_compatibility_with_actors(CompatibilityCheckScope::Both)
1321            .await?;
1322        // Propagate Operators configurations to all verifier clients
1323        const CHANNEL_CAPACITY: usize = 1024 * 16;
1324        let (operator_params_tx, operator_params_rx) =
1325            tokio::sync::broadcast::channel(CHANNEL_CAPACITY);
1326        let operator_params_rx_handles = (0..self.get_verifier_clients().len())
1327            .map(|_| operator_params_rx.resubscribe())
1328            .collect::<Vec<_>>();
1329
1330        let operators = self.get_operator_clients().to_vec();
1331        let operator_pks = self.fetch_operator_keys().await?;
1332        let operator_ids = operator_pks
1333            .iter()
1334            .map(|key| OperatorId(*key))
1335            .collect::<Vec<_>>();
1336        let get_operator_params_chunked_handle = tokio::spawn(async move {
1337            tracing::info!(clients = operators.len(), "Collecting operator details...");
1338            try_join_all_combine_errors(operators.iter().zip(operator_ids.iter()).map(
1339                |(operator, id)| {
1340                    let mut operator = operator.clone();
1341                    let tx = operator_params_tx.clone();
1342                    async move {
1343                        let stream = operator
1344                            .get_params(Request::new(Empty {}))
1345                            .await
1346                            .wrap_err_with(|| AggregatorError::RequestFailed {
1347                                request_name: format!("Operator get params for {id}"),
1348                            })
1349                            .map_err(BridgeError::from)?
1350                            .into_inner();
1351                        tx.send(stream.try_collect::<Vec<_>>().await?)
1352                            .map_err(|e| {
1353                                BridgeError::from(eyre::eyre!(
1354                                    "Failed to read operator params for {id}: {e}"
1355                                ))
1356                            })?;
1357                        Ok::<_, Status>(())
1358                    }
1359                },
1360            ))
1361            .await
1362            .wrap_err("Failed to get operator params from operators")
1363            .map_to_status()?;
1364            Ok::<_, Status>(())
1365        });
1366
1367        let verifiers = self.get_verifier_clients().to_vec();
1368        let verifier_pks = self.fetch_verifier_keys().await?;
1369        let verifier_ids = verifier_pks
1370            .iter()
1371            .map(|key| VerifierId(*key))
1372            .collect::<Vec<_>>();
1373        let set_operator_params_handle = tokio::spawn(async move {
1374            tracing::info!("Informing verifiers of existing operators...");
1375            try_join_all_combine_errors(
1376                verifiers
1377                    .iter()
1378                    .zip(verifier_ids.iter())
1379                    .zip(operator_params_rx_handles)
1380                    .map(|((verifier, id), mut rx)| {
1381                        let verifier = verifier.clone();
1382                        async move {
1383                            collect_and_call(&mut rx, |params| {
1384                                let mut verifier = verifier.clone();
1385                                async move {
1386                                    verifier
1387                                        .set_operator(futures::stream::iter(params))
1388                                        .await
1389                                        .wrap_err_with(|| AggregatorError::RequestFailed {
1390                                            request_name: format!("Verifier set_operator for {id}"),
1391                                        })
1392                                        .map_err(BridgeError::from)?;
1393                                    Ok::<_, Status>(())
1394                                }
1395                            })
1396                            .await?;
1397                            Ok::<_, Status>(())
1398                        }
1399                    }),
1400            )
1401            .await
1402            .wrap_err("Failed to set_operator for all verifiers")
1403            .map_to_status()?;
1404            Ok::<_, Status>(())
1405        });
1406
1407        let task_outputs = timed_request(
1408            SETUP_COMPLETION_TIMEOUT,
1409            "Aggregator setup pipeline",
1410            async move {
1411                Ok::<_, BridgeError>(
1412                    futures::future::join_all([
1413                        get_operator_params_chunked_handle,
1414                        set_operator_params_handle,
1415                    ])
1416                    .await,
1417                )
1418            },
1419        )
1420        .await?;
1421
1422        let task_names = ["Get operator params", "Set operator params"];
1423        debug_assert_eq!(task_names.len(), task_outputs.len());
1424
1425        flatten_join_named_results(task_names.into_iter().zip(task_outputs.into_iter()))?;
1426
1427        Ok(Response::new(VerifierPublicKeys::from(verifier_pks)))
1428    }
1429
1430    /// Handles a new deposit request from a user. This function coordinates the signing process
1431    /// between verifiers to create a valid move transaction. It ensures a covenant using pre-signed NofN transactions.
1432    /// It also collects signatures from operators to ensure that the operators can be slashed if they act maliciously.
1433    ///
1434    /// Overview:
1435    /// 1. Receive and parse deposit parameters from user
1436    /// 2. Signs all NofN transactions with verifiers using MuSig2:
1437    ///    - Creates nonce streams with verifiers (get pub nonces for each transaction)
1438    ///    - Opens deposit signing streams with verifiers (sends aggnonces for each transaction, receives partial sigs)
1439    ///    - Opens deposit finalization streams with verifiers (sends final signatures, receives movetx signatures)
1440    /// 3. Collects signatures from operators
1441    /// 4. Waits for all tasks to complete
1442    /// 5. Returns signed move transaction
1443    ///
1444    /// The following pipelines are used to coordinate the signing process, these move the data between the verifiers and the aggregator:
1445    ///    - Nonce aggregation
1446    ///    - Nonce distribution
1447    ///    - Signature aggregation
1448    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
1449    async fn new_deposit(
1450        &self,
1451        request: Request<Deposit>,
1452    ) -> Result<Response<clementine::RawSignedTx>, Status> {
1453        tracing::info!("New deposit rpc called");
1454        self.check_compatibility_with_actors(CompatibilityCheckScope::Both)
1455            .await?;
1456
1457        timed_request(OVERALL_DEPOSIT_TIMEOUT, "Overall new deposit", async {
1458            let deposit_info: DepositInfo = request.into_inner().try_into()?;
1459            tracing::info!(
1460                "Parsed new deposit rpc params, deposit info: {:?}",
1461                deposit_info
1462            );
1463
1464            let deposit_data = DepositData {
1465                deposit: deposit_info.clone(),
1466                nofn_xonly_pk: None,
1467                actors: Actors {
1468                    verifiers: self.fetch_verifier_keys().await?,
1469                    watchtowers: vec![],
1470                    operators: self.fetch_operator_keys().await?,
1471                },
1472                security_council: self.config.security_council.clone(),
1473            };
1474            tracing::info!(
1475                "Created deposit data in new_deposit for deposit info: {:?}, deposit data: {:?}",
1476                deposit_info,
1477                deposit_data
1478            );
1479
1480            let deposit_params = deposit_data.clone().into();
1481
1482            // Collect and distribute keys needed keys from operators and watchtowers to verifiers
1483            let start = std::time::Instant::now();
1484            timed_request(
1485                KEY_DISTRIBUTION_TIMEOUT,
1486                "Key collection and distribution",
1487                self.collect_and_distribute_keys(&deposit_params),
1488            )
1489            .await?;
1490            tracing::info!("Collected and distributed keys in {:?}", start.elapsed());
1491
1492            let verifiers = self.get_participating_verifiers(&deposit_data).await?;
1493            let verifiers_ids = verifiers.ids();
1494
1495            // Generate nonce streams for all verifiers.
1496            let num_required_sigs = self.config.get_num_required_nofn_sigs(&deposit_data);
1497            let num_required_nonces = num_required_sigs as u32 + 2; // ask for +2 for the final movetx signature + emergency stop signature, but don't send it on deposit_sign stage
1498            let (first_responses, nonce_streams) =
1499                    create_nonce_streams(
1500                        verifiers.clone(),
1501                        num_required_nonces,
1502                        #[cfg(test)]
1503                        &self.config,
1504                    )
1505                    .await?;
1506
1507            // Create initial deposit session and send to verifiers
1508            let deposit_sign_session = DepositSignSession {
1509                deposit_params: Some(deposit_params.clone()),
1510                nonce_gen_first_responses: first_responses,
1511            };
1512
1513            let deposit_sign_param: VerifierDepositSignParams =
1514                    deposit_sign_session.clone().into();
1515
1516        #[allow(clippy::unused_enumerate_index)]
1517            let partial_sig_streams = timed_try_join_all(
1518                PARTIAL_SIG_STREAM_CREATION_TIMEOUT,
1519                "Partial signature stream creation",
1520                Some(verifiers.ids()),
1521                verifiers.clients().into_iter().enumerate().map(|(_idx, verifier_client)| {
1522                    let mut verifier_client = verifier_client.clone();
1523                    #[cfg(test)]
1524                    let config = self.config.clone();
1525
1526                    let deposit_sign_param =
1527                    deposit_sign_param.clone();
1528
1529                    async move {
1530                        #[cfg(test)]
1531                        config
1532                            .test_params
1533                            .timeout_params
1534                            .hook_timeout_partial_sig_stream_creation_verifier(_idx)
1535                            .await;
1536
1537                        let (tx, rx) = tokio::sync::mpsc::channel(num_required_nonces as usize + 1); // initial param + num_required_nonces nonces
1538
1539                        let stream = verifier_client
1540                            .deposit_sign(tokio_stream::wrappers::ReceiverStream::new(rx))
1541                            .await?
1542                            .into_inner();
1543
1544                        tx.send(deposit_sign_param).await.map_err(|e| {
1545                            BridgeError::from(eyre::eyre!("Failed to send deposit sign session: {e:?}"))})?;
1546
1547                        Ok::<_, BridgeError>((stream, tx))
1548                    }
1549                })
1550            )
1551            .await?;
1552
1553            // Set up deposit finalization streams
1554        #[allow(clippy::unused_enumerate_index)]
1555            let deposit_finalize_streams = verifiers.clients().into_iter().enumerate().map(
1556                    |(_idx, mut verifier)| {
1557                        let (tx, rx) = tokio::sync::mpsc::channel(num_required_nonces as usize + 1);
1558                        let receiver_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1559                        #[cfg(test)]
1560                        let config = self.config.clone();
1561                        // start deposit_finalize with tokio spawn
1562                        let deposit_finalize_future = tokio::spawn(async move {
1563                            #[cfg(test)]
1564                            config
1565                                .test_params
1566                                .timeout_params
1567                                .hook_timeout_deposit_finalize_verifier(_idx)
1568                                .await;
1569
1570                            verifier.deposit_finalize(receiver_stream).await
1571                        });
1572
1573                        Ok::<_, BridgeError>((deposit_finalize_future, tx))
1574                    },
1575                ).collect::<Result<Vec<_>, BridgeError>>()?;
1576
1577            tracing::info!("Sending deposit finalize streams to verifiers for deposit {:?}", deposit_info);
1578
1579            let (deposit_finalize_futures, deposit_finalize_sender): (Vec<_>, Vec<_>) =
1580                deposit_finalize_streams.into_iter().unzip();
1581
1582            // Send initial finalization params
1583            let deposit_finalize_first_param: VerifierDepositFinalizeParams =
1584                deposit_sign_session.clone().into();
1585
1586            timed_try_join_all(
1587                DEPOSIT_FINALIZE_STREAM_CREATION_TIMEOUT,
1588                "Deposit finalization initial param send",
1589                Some(verifiers.ids()),
1590                deposit_finalize_sender.iter().cloned().map(|tx| {
1591                    let param = deposit_finalize_first_param.clone();
1592                    async move {
1593                        tx.send(param).await
1594                        .map_err(|e| {
1595                            BridgeError::from(eyre::eyre!(
1596                                "Failed to send deposit finalize first param: {e:?}"))
1597                        })
1598                    }
1599                })
1600            ).await?;
1601
1602
1603            let deposit_blockhash = self
1604                .rpc
1605                .get_blockhash_of_tx(&deposit_data.get_deposit_outpoint().txid)
1606                .await
1607                .map_to_status()?;
1608
1609            let verifiers_public_keys = deposit_data.get_verifiers();
1610
1611            let needed_nofn_sigs = self.config.get_num_required_nofn_sigs(&deposit_data);
1612
1613            // Create sighash stream for transaction signing
1614            let sighash_stream = Box::pin(create_nofn_sighash_stream(
1615                self.db.clone(),
1616                self.config.clone(),
1617                deposit_data.clone(),
1618                deposit_blockhash,
1619                false,
1620            ));
1621
1622            // Create channels for pipeline communication
1623            let (agg_nonce_sender, agg_nonce_receiver) = channel(num_required_nonces as usize);
1624            let (partial_sig_sender, partial_sig_receiver) = channel(num_required_nonces as usize);
1625            let (final_sig_sender, final_sig_receiver) = channel(num_required_nonces as usize);
1626
1627            // Start the nonce aggregation pipe.
1628            let nonce_agg_handle = tokio::spawn(nonce_aggregator(
1629                nonce_streams,
1630                sighash_stream,
1631                agg_nonce_sender,
1632                needed_nofn_sigs,
1633                verifiers_ids.clone(),
1634            ));
1635
1636            // Start the nonce distribution pipe.
1637            let nonce_dist_handle = tokio::spawn(nonce_distributor(
1638                agg_nonce_receiver,
1639                partial_sig_streams,
1640                partial_sig_sender,
1641                needed_nofn_sigs,
1642                verifiers_ids.clone(),
1643            ));
1644
1645            // Start the signature aggregation pipe.
1646            let sig_agg_handle = tokio::spawn(signature_aggregator(
1647                partial_sig_receiver,
1648                verifiers_public_keys,
1649                final_sig_sender,
1650                needed_nofn_sigs,
1651            ));
1652
1653            tracing::debug!("Getting signatures from operators");
1654            // Get sigs from each operator in background
1655            let operators = self.get_participating_operators(&deposit_data).await?;
1656
1657            let config_clone = self.config.clone();
1658            let operator_sigs_fut = tokio::spawn(async move {
1659                timed_request(
1660                    OPERATOR_SIGS_TIMEOUT,
1661                    "Operator signature collection",
1662                    async {
1663                        Aggregator::collect_operator_sigs(
1664                            operators,
1665                            config_clone,
1666                            deposit_sign_session,
1667                        )
1668                        .await
1669                    },
1670                )
1671                .await
1672            });
1673
1674            // Join the nonce aggregation handle to get the movetx agg nonce.
1675            let nonce_agg_handle = nonce_agg_handle
1676                .map_err(|_| Status::internal("panic when aggregating nonces"))
1677                .map(
1678                    |res| -> Result<((AggregatedNonce, Vec<PublicNonce>), (AggregatedNonce, Vec<PublicNonce>)), Status> {
1679                        res.and_then(|r| r.map_err(Into::into))
1680                    },
1681                )
1682                .shared();
1683
1684            // Start the deposit finalization pipe.
1685            let sig_dist_handle = tokio::spawn(signature_distributor(
1686                final_sig_receiver,
1687                deposit_finalize_sender.clone(),
1688                nonce_agg_handle.clone(),
1689                needed_nofn_sigs,
1690                verifiers_ids.clone(),
1691            ));
1692
1693            // Right now we collect all operator sigs then start to send them, we can do it simultaneously in the future
1694            // Need to change sig verification ordering in deposit_finalize() in verifiers so that we verify
1695            // 1st signature of all operators, then 2nd of all operators etc.
1696            let all_op_sigs = operator_sigs_fut
1697                .await
1698                .map_err(|_| BridgeError::from(eyre::eyre!("panic when collecting operator signatures")))??;
1699
1700            tracing::info!("Got all operator signatures for deposit {:?}", deposit_info);
1701
1702            // Wait for all pipeline tasks to complete
1703            // join_all should be enough here as if one fails other tasks should fail too as they are connected through streams
1704            // one should not hang if any other task fails, the others should finish
1705            // this is needed because try_join_all can potentially not return the error of the first task that failed, just the one it polled first
1706            // that returned an error
1707            let task_outputs =  timed_request(
1708                PIPELINE_COMPLETION_TIMEOUT,
1709                "MuSig2 signing pipeline",
1710                async move {
1711                    Ok::<_, BridgeError>(futures::future::join_all([nonce_dist_handle, sig_agg_handle, sig_dist_handle]).await)
1712                },
1713            )
1714            .await?;
1715
1716            let task_names = ["Nonce distribution", "Signature aggregation", "Signature distribution"];
1717
1718            debug_assert_eq!(task_names.len(), task_outputs.len());
1719
1720            flatten_join_named_results(
1721                task_names.into_iter().zip(task_outputs.into_iter()),
1722            )?;
1723            tracing::info!("All deposit_sign related tasks completed for deposit {:?}, now sending operator signatures to verifiers for verification", deposit_info);
1724
1725            tracing::debug!("Pipeline tasks completed");
1726            let verifiers_ids = verifiers.ids();
1727
1728            // send operators sigs to verifiers after all verifiers have signed
1729            let deposit_finalize_futures = timed_request(
1730                SEND_OPERATOR_SIGS_TIMEOUT,
1731                "Sending operator signatures to verifiers",
1732                async {
1733                    let send_operator_sigs: Vec<_> = deposit_finalize_sender
1734                        .iter()
1735                        .zip(verifiers_ids.iter())
1736                        .zip(deposit_finalize_futures.into_iter())
1737                        .map(|((tx, id), dep_fin_fut)| async {
1738                            for one_op_sigs in all_op_sigs.iter() {
1739                                for sig in one_op_sigs.iter() {
1740                                    let deposit_finalize_param: VerifierDepositFinalizeParams =
1741                                        sig.into();
1742
1743                                    let send = tx.send(deposit_finalize_param).await;
1744                                    match send {
1745                                        Ok(()) => (),
1746                                        Err(e) => {
1747                                            // check exact error by awaiting the future
1748                                            dep_fin_fut.await.wrap_err(format!("{} deposit finalize tokio task on aggregator returned error", id.clone()))?.wrap_err(format!("{} deposit finalize rpc call returned error", id.clone()))?;
1749                                            return Err(BridgeError::from(eyre::eyre!(format!("{} deposit finalize stream sending returned error: {:?}", id.clone(), e))));
1750                                        }
1751                                    }
1752                                }
1753                            }
1754
1755                            Ok::<_, BridgeError>(dep_fin_fut)
1756                        })
1757                        .collect();
1758                    try_join_all_combine_errors(send_operator_sigs).await
1759                },
1760            )
1761            .await.wrap_err("Failed to send operator signatures to verifiers")?;
1762
1763            tracing::info!("All operator signatures sent to verifiers for verification, now waiting to collect movetx and emergency stop tx partial signatures from verifiers for deposit {:?}", deposit_info);
1764
1765            // Collect partial signatures for move transaction
1766            let partial_sigs: Vec<(Vec<u8>, Vec<u8>)> = timed_try_join_all(
1767                    DEPOSIT_FINALIZATION_TIMEOUT,
1768                    "Deposit finalization",
1769                    Some(verifiers.ids()),
1770                    deposit_finalize_futures.into_iter().map(|fut| async move {
1771                        let inner = fut.await
1772                            .map_err(|_| BridgeError::from(eyre::eyre!("panic finishing deposit_finalize")))??
1773                            .into_inner();
1774                        Ok((inner.move_to_vault_partial_sig, inner.emergency_stop_partial_sig))
1775                    }),
1776                )
1777                .await?;
1778
1779
1780            let (move_to_vault_sigs, emergency_stop_sigs): (Vec<Vec<u8>>, Vec<Vec<u8>>) =
1781                partial_sigs.into_iter().unzip();
1782
1783            tracing::info!("Received move tx and emergency stop tx partial signatures for deposit {:?}", deposit_info);
1784
1785            // Create the final move transaction and check the signatures
1786            let (movetx_agg_nonce, emergency_stop_agg_nonce) = nonce_agg_handle.await?;
1787
1788            // Verify emergency stop signatures
1789            self.verify_and_save_emergency_stop_sigs(
1790                emergency_stop_sigs,
1791                emergency_stop_agg_nonce,
1792                deposit_params.clone(),
1793            )
1794            .await?;
1795
1796            let signed_movetx_handler = self
1797                .create_movetx(move_to_vault_sigs, movetx_agg_nonce, deposit_params)
1798                .await?;
1799
1800            let raw_signed_tx = RawSignedTx {
1801                raw_tx: bitcoin::consensus::serialize(&signed_movetx_handler.get_cached_tx()),
1802            };
1803
1804            tracing::info!("Created final move transaction for deposit {:?}", deposit_info);
1805
1806            Ok(Response::new(raw_signed_tx))
1807        })
1808        .await.map_err(Into::into)
1809    }
1810
1811    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
1812    async fn withdraw(
1813        &self,
1814        request: Request<AggregatorWithdrawalInput>,
1815    ) -> Result<Response<AggregatorWithdrawResponse>, Status> {
1816        tracing::warn!("Withdraw rpc called");
1817        let request = request.into_inner();
1818        let (withdraw_params_with_sig, operator_xonly_pks) = (
1819            request.withdrawal.ok_or(Status::invalid_argument(
1820                "withdrawalParamsWithSig is missing",
1821            ))?,
1822            request.operator_xonly_pks,
1823        );
1824        // check compatibility with operators only
1825        self.check_compatibility_with_actors(CompatibilityCheckScope::OperatorsOnly)
1826            .await?;
1827
1828        let withdraw_params = withdraw_params_with_sig
1829            .clone()
1830            .withdrawal
1831            .ok_or(Status::invalid_argument("withdrawalParams is missing"))?;
1832
1833        // convert rpc xonly pks to bitcoin xonly pks
1834        let operator_xonly_pks_from_rpc: Vec<XOnlyPublicKey> = operator_xonly_pks
1835            .into_iter()
1836            .map(|xonly_pk| {
1837                xonly_pk.try_into().map_err(|e| {
1838                    Status::invalid_argument(format!("Failed to convert xonly public key: {e}"))
1839                })
1840            })
1841            .collect::<Result<Vec<_>, Status>>()?;
1842
1843        tracing::info!(
1844            "Parsed withdraw rpc params, withdrawal params: {:?}, operator xonly pks: {:?}",
1845            withdraw_params,
1846            operator_xonly_pks_from_rpc
1847                .iter()
1848                .map(|pk| pk.to_string())
1849                .collect::<Vec<_>>()
1850        );
1851
1852        // parse_withdrawal_sig_params is called to check if the inputs can be parsed correctly
1853        // and check if input sighash type is SinglePlusAnyoneCanPay
1854        let (withdrawal_id, _, _, _, _) =
1855            parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
1856
1857        // check if all given operator xonly pubkeys are a valid operator xonly pubkey, to warn the caller if
1858        // something is wrong with the given operator xonly pubkeys
1859        let current_operator_xonly_pks = self.fetch_operator_keys().await?;
1860        let invalid_operator_xonly_pks = operator_xonly_pks_from_rpc
1861            .iter()
1862            .filter(|xonly_pk| !current_operator_xonly_pks.contains(xonly_pk))
1863            .collect::<Vec<_>>();
1864        if !invalid_operator_xonly_pks.is_empty() {
1865            return Err(Status::invalid_argument(format!(
1866                "Given xonly public key doesn't belong to any current operator: invalid keys: {invalid_operator_xonly_pks:?}, current operators: {current_operator_xonly_pks:?}"
1867            )));
1868        }
1869
1870        let operators = self
1871            .get_operator_clients()
1872            .iter()
1873            .zip(current_operator_xonly_pks.into_iter());
1874        let withdraw_futures = operators
1875            .filter(|(_, xonly_pk)| {
1876                // check if operator_xonly_pks is empty or contains the operator's xonly public key
1877                operator_xonly_pks_from_rpc.is_empty()
1878                    || operator_xonly_pks_from_rpc.contains(xonly_pk)
1879            })
1880            .map(|(operator, operator_xonly_pk)| {
1881                let mut operator = operator.clone();
1882                let params = withdraw_params_with_sig.clone();
1883                let mut request = Request::new(params);
1884                request.set_timeout(WITHDRAWAL_TIMEOUT);
1885                async move { (operator.withdraw(request).await, operator_xonly_pk) }
1886            });
1887
1888        // collect responses from operators and return them as a vector of strings
1889        let responses = futures::future::join_all(withdraw_futures).await;
1890        tracing::info!(
1891            "Withdraw rpc completed successfully for withdrawal id: {}, operator xonly pks: {:?}, responses: {:?}",
1892            withdrawal_id,
1893            operator_xonly_pks_from_rpc
1894                .iter()
1895                .map(|pk| pk.to_string())
1896                .collect::<Vec<_>>(),
1897            responses,
1898        );
1899        Ok(Response::new(AggregatorWithdrawResponse {
1900            withdraw_responses: responses
1901                .into_iter()
1902                .map(|(res, xonly_pk)| match res {
1903                    Ok(withdraw_response) => OperatorWithrawalResponse {
1904                        operator_xonly_pk: Some(xonly_pk.into()),
1905                        response: Some(operator_withrawal_response::Response::RawTx(
1906                            withdraw_response.into_inner(),
1907                        )),
1908                    },
1909                    Err(e) => OperatorWithrawalResponse {
1910                        operator_xonly_pk: Some(xonly_pk.into()),
1911                        response: Some(operator_withrawal_response::Response::Error(e.to_string())),
1912                    },
1913                })
1914                .collect(),
1915        }))
1916    }
1917
1918    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
1919    async fn get_nofn_aggregated_xonly_pk(
1920        &self,
1921        _request: tonic::Request<super::Empty>,
1922    ) -> std::result::Result<tonic::Response<super::NofnResponse>, tonic::Status> {
1923        tracing::info!("Get nofn aggregated xonly pk rpc called");
1924        let verifier_keys = self.fetch_verifier_keys().await?;
1925        let num_verifiers = verifier_keys.len();
1926        let nofn_xonly_pk = bitcoin::XOnlyPublicKey::from_musig2_pks(verifier_keys.clone(), None)
1927            .map_err(|e| {
1928            Status::internal(format!(
1929                "Failed to aggregate verifier public keys, err: {e}, pubkeys: {verifier_keys:?}"
1930            ))
1931        })?;
1932        Ok(Response::new(super::NofnResponse {
1933            nofn_xonly_pk: nofn_xonly_pk.serialize().to_vec(),
1934            num_verifiers: num_verifiers as u32,
1935        }))
1936    }
1937
1938    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
1939    async fn internal_get_emergency_stop_tx(
1940        &self,
1941        request: Request<clementine::GetEmergencyStopTxRequest>,
1942    ) -> Result<Response<clementine::GetEmergencyStopTxResponse>, Status> {
1943        tracing::warn!("Get emergency stop tx rpc called");
1944        let inner_request = request.into_inner();
1945        let txids: Vec<Txid> = inner_request
1946            .txids
1947            .into_iter()
1948            .map(|txid| {
1949                Txid::from_slice(&txid.txid).map_err(|e| {
1950                    tonic::Status::invalid_argument(format!("Failed to parse txid: {e}"))
1951                })
1952            })
1953            .collect::<Result<Vec<_>, _>>()?;
1954        tracing::warn!(
1955            "Parsed get emergency stop tx rpc params, move txids: {:?}",
1956            txids
1957                .iter()
1958                .map(|txid| txid.to_string())
1959                .collect::<Vec<_>>()
1960        );
1961
1962        let emergency_stop_txs = self.db.get_emergency_stop_txs(None, txids).await?;
1963
1964        let (txids, encrypted_emergency_stop_txs): (Vec<Txid>, Vec<Vec<u8>>) =
1965            emergency_stop_txs.into_iter().unzip();
1966
1967        Ok(Response::new(clementine::GetEmergencyStopTxResponse {
1968            txids: txids.into_iter().map(|txid| txid.into()).collect(),
1969            encrypted_emergency_stop_txs,
1970        }))
1971    }
1972
1973    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
1974    async fn send_move_to_vault_tx(
1975        &self,
1976        request: Request<clementine::SendMoveTxRequest>,
1977    ) -> Result<Response<clementine::Txid>, Status> {
1978        tracing::info!("Send move to vault tx rpc called");
1979        #[cfg(not(feature = "automation"))]
1980        {
1981            let _ = request;
1982            return Err(Status::unimplemented(
1983                "Automation is disabled, cannot automatically send move to vault tx.",
1984            ));
1985        }
1986
1987        #[cfg(feature = "automation")]
1988        {
1989            use bitcoin::Amount;
1990            use std::sync::Arc;
1991
1992            use crate::builder::{
1993                address::create_taproot_address,
1994                script::{CheckSig, Multisig, SpendableScript},
1995                transaction::anchor_output,
1996            };
1997
1998            let request = request.into_inner();
1999            let movetx: bitcoin::Transaction = bitcoin::consensus::deserialize(
2000                &request
2001                    .raw_tx
2002                    .ok_or_eyre("raw_tx is required")
2003                    .map_to_status()?
2004                    .raw_tx,
2005            )
2006            .wrap_err("Failed to deserialize movetx")
2007            .map_to_status()?;
2008            let deposit_outpoint: bitcoin::OutPoint = request
2009                .deposit_outpoint
2010                .ok_or(Status::invalid_argument("deposit_outpoint is required"))?
2011                .try_into()?;
2012
2013            tracing::info!(
2014                "Parsed send move to vault tx rpc params, deposit outpoint: {:?}, movetx hex: {}",
2015                deposit_outpoint,
2016                bitcoin::consensus::encode::serialize_hex(&movetx)
2017            );
2018
2019            // check if transaction is a movetx
2020            if movetx.input.len() != 1 || movetx.output.len() != 2 {
2021                return Err(Status::invalid_argument(
2022                    "Transaction is not a movetx, input or output lengths are not correct",
2023                ));
2024            }
2025            // check output values
2026            // movetx always has 0 sat anchor output
2027            if !(movetx.output[0].value == self.config.protocol_paramset().bridge_amount
2028                && movetx.output[1].value == Amount::from_sat(0))
2029            {
2030                return Err(Status::invalid_argument(format!(
2031                    "Transaction is not a movetx, output sat values are not correct, should be ({}, 0), got ({}, {})",
2032                    self.config.protocol_paramset().bridge_amount,
2033                    movetx.output[0].value,
2034                    movetx.output[1].value,
2035                )));
2036            }
2037            // check output scriptpubkeys
2038            let verifier_keys = self.fetch_verifier_keys().await?;
2039            let nofn_xonly_pk =
2040                bitcoin::XOnlyPublicKey::from_musig2_pks(verifier_keys.clone(), None).map_err(
2041                    |e| {
2042                        Status::internal(format!(
2043                            "Failed to aggregate verifier public keys, err: {e}, pubkeys: {verifier_keys:?}"
2044                        ))
2045                    },
2046                )?;
2047            let nofn_script = Arc::new(CheckSig::new(nofn_xonly_pk));
2048            let security_council_script = Arc::new(Multisig::from_security_council(
2049                self.config.security_council.clone(),
2050            ));
2051
2052            let (addr, _) = create_taproot_address(
2053                &[
2054                    nofn_script.to_script_buf(),
2055                    security_council_script.to_script_buf(),
2056                ],
2057                None,
2058                self.config.protocol_paramset().network,
2059            );
2060            let bridge_script_pubkey = addr.script_pubkey();
2061
2062            if !(movetx.output[1].script_pubkey
2063                == anchor_output(self.config.protocol_paramset().anchor_amount()).script_pubkey
2064                && movetx.output[0].script_pubkey == bridge_script_pubkey)
2065            {
2066                return Err(Status::invalid_argument(
2067                    format!("Transaction is not a movetx, output scriptpubkeys are not correct, expected: (vault: {:?}, anchor: {:?}), got: (vault: {:?}, anchor: {:?})",
2068                    bridge_script_pubkey,
2069                    anchor_output(self.config.protocol_paramset().anchor_amount()).script_pubkey,
2070                    movetx.output[0].script_pubkey,
2071                    movetx.output[1].script_pubkey,
2072                )));
2073            }
2074
2075            let mut dbtx = self.db.begin_transaction().await?;
2076            self.tx_sender
2077                .insert_try_to_send(
2078                    &mut dbtx,
2079                    Some(TxMetadata {
2080                        deposit_outpoint: Some(deposit_outpoint),
2081                        operator_xonly_pk: None,
2082                        round_idx: None,
2083                        kickoff_idx: None,
2084                        tx_type: TransactionType::MoveToVault,
2085                    }),
2086                    &movetx,
2087                    FeePayingType::CPFP,
2088                    None,
2089                    &[],
2090                    &[],
2091                    &[],
2092                    &[],
2093                )
2094                .await
2095                .map_to_status()?;
2096            dbtx.commit()
2097                .await
2098                .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
2099
2100            Ok(Response::new(movetx.compute_txid().into()))
2101        }
2102    }
2103}
2104
2105#[cfg(test)]
2106mod tests {
2107    use crate::actor::Actor;
2108    use crate::builder;
2109    use crate::config::BridgeConfig;
2110    use crate::deposit::{BaseDepositData, DepositInfo, DepositType};
2111    use crate::musig2::AggregateFromPublicKeys;
2112    use crate::rpc::clementine::clementine_aggregator_client::ClementineAggregatorClient;
2113    use crate::rpc::clementine::{self, GetEntityStatusesRequest, SendMoveTxRequest};
2114    use crate::rpc::get_clients;
2115    use crate::servers::create_aggregator_unix_server;
2116    use crate::test::common::citrea::MockCitreaClient;
2117    use crate::test::common::tx_utils::ensure_tx_onchain;
2118    use crate::test::common::*;
2119    use bitcoin::hashes::Hash;
2120    use bitcoincore_rpc::RpcApi;
2121    use clementine_primitives::EVMAddress;
2122    use eyre::Context;
2123    use std::time::Duration;
2124    use tokio::time::sleep;
2125    use tonic::{Request, Status};
2126
2127    #[cfg(feature = "automation")]
2128    async fn perform_deposit(mut config: BridgeConfig) -> Result<(), Status> {
2129        let regtest = create_regtest_rpc(&mut config).await;
2130        let rpc = regtest.rpc();
2131
2132        let actors = create_actors::<MockCitreaClient>(&config).await;
2133        let _unused =
2134            run_single_deposit::<MockCitreaClient>(&mut config, rpc.clone(), None, &actors, None)
2135                .await?;
2136
2137        Ok(())
2138    }
2139
2140    #[tokio::test(flavor = "multi_thread")]
2141    async fn aggregator_double_deposit() {
2142        let mut config = create_test_config_with_thread_name().await;
2143        let regtest = create_regtest_rpc(&mut config).await;
2144        let rpc = regtest.rpc();
2145        let actors = create_actors::<MockCitreaClient>(&config).await;
2146        let mut aggregator = actors.get_aggregator();
2147
2148        let evm_address = EVMAddress([1u8; 20]);
2149        let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2150
2151        let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2152            .setup(tonic::Request::new(clementine::Empty {}))
2153            .await
2154            .unwrap()
2155            .into_inner()
2156            .try_into()
2157            .unwrap();
2158        sleep(Duration::from_secs(3)).await;
2159
2160        let nofn_xonly_pk =
2161            bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2162
2163        let deposit_address = builder::address::generate_deposit_address(
2164            nofn_xonly_pk,
2165            signer.address.as_unchecked(),
2166            evm_address,
2167            config.protocol_paramset().network,
2168            config.protocol_paramset().user_takes_after,
2169        )
2170        .unwrap()
2171        .0;
2172
2173        let deposit_outpoint = rpc
2174            .send_to_address(&deposit_address, config.protocol_paramset().bridge_amount)
2175            .await
2176            .unwrap();
2177        rpc.mine_blocks(18).await.unwrap();
2178
2179        let deposit_info = DepositInfo {
2180            deposit_outpoint,
2181            deposit_type: DepositType::BaseDeposit(BaseDepositData {
2182                evm_address,
2183                recovery_taproot_address: signer.address.as_unchecked().clone(),
2184            }),
2185        };
2186
2187        // Two deposits with the same values.
2188        let movetx_one = aggregator
2189            .new_deposit(clementine::Deposit::from(deposit_info.clone()))
2190            .await
2191            .unwrap()
2192            .into_inner();
2193        let movetx_one_txid: bitcoin::Txid = aggregator
2194            .send_move_to_vault_tx(SendMoveTxRequest {
2195                deposit_outpoint: Some(deposit_outpoint.into()),
2196                raw_tx: Some(movetx_one),
2197            })
2198            .await
2199            .unwrap()
2200            .into_inner()
2201            .try_into()
2202            .unwrap();
2203
2204        let movetx_two = aggregator
2205            .new_deposit(clementine::Deposit::from(deposit_info))
2206            .await
2207            .unwrap()
2208            .into_inner();
2209        let _movetx_two_txid: bitcoin::Txid = aggregator
2210            .send_move_to_vault_tx(SendMoveTxRequest {
2211                deposit_outpoint: Some(deposit_outpoint.into()),
2212                raw_tx: Some(movetx_two),
2213            })
2214            .await
2215            .unwrap()
2216            .into_inner()
2217            .try_into()
2218            .unwrap();
2219        rpc.mine_blocks(1).await.unwrap();
2220        sleep(Duration::from_secs(3)).await;
2221
2222        poll_until_condition(
2223            async || {
2224                rpc.mine_blocks(1).await.unwrap();
2225                Ok(rpc
2226                    .is_tx_on_chain(&movetx_one_txid)
2227                    .await
2228                    .unwrap_or_default())
2229            },
2230            None,
2231            None,
2232        )
2233        .await
2234        .wrap_err_with(|| eyre::eyre!("MoveTx did not land onchain"))
2235        .unwrap();
2236    }
2237
2238    #[tokio::test(flavor = "multi_thread")]
2239    async fn aggregator_deposit_movetx_lands_onchain() {
2240        let mut config = create_test_config_with_thread_name().await;
2241        let regtest = create_regtest_rpc(&mut config).await;
2242        let rpc = regtest.rpc();
2243        let actors = create_actors::<MockCitreaClient>(&config).await;
2244        let mut aggregator = actors.get_aggregator();
2245
2246        let evm_address = EVMAddress([1u8; 20]);
2247        let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2248
2249        let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2250            .setup(tonic::Request::new(clementine::Empty {}))
2251            .await
2252            .unwrap()
2253            .into_inner()
2254            .try_into()
2255            .unwrap();
2256        sleep(Duration::from_secs(3)).await;
2257
2258        let nofn_xonly_pk =
2259            bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2260
2261        let deposit_address = builder::address::generate_deposit_address(
2262            nofn_xonly_pk,
2263            signer.address.as_unchecked(),
2264            evm_address,
2265            config.protocol_paramset().network,
2266            config.protocol_paramset().user_takes_after,
2267        )
2268        .unwrap()
2269        .0;
2270
2271        let deposit_outpoint = rpc
2272            .send_to_address(&deposit_address, config.protocol_paramset().bridge_amount)
2273            .await
2274            .unwrap();
2275        rpc.mine_blocks(18).await.unwrap();
2276
2277        let deposit_info = DepositInfo {
2278            deposit_outpoint,
2279            deposit_type: DepositType::BaseDeposit(BaseDepositData {
2280                evm_address,
2281                recovery_taproot_address: signer.address.as_unchecked().clone(),
2282            }),
2283        };
2284
2285        // Generate and broadcast the move-to-vault transaction
2286        let start_time = std::time::Instant::now();
2287        let raw_move_tx = aggregator
2288            .new_deposit(clementine::Deposit::from(deposit_info))
2289            .await
2290            .unwrap()
2291            .into_inner();
2292        let end_time = std::time::Instant::now();
2293        tracing::info!("New deposit time: {:?}", end_time - start_time);
2294
2295        let movetx_txid = aggregator
2296            .send_move_to_vault_tx(SendMoveTxRequest {
2297                deposit_outpoint: Some(deposit_outpoint.into()),
2298                raw_tx: Some(raw_move_tx),
2299            })
2300            .await
2301            .unwrap()
2302            .into_inner()
2303            .try_into()
2304            .unwrap();
2305
2306        rpc.mine_blocks(1).await.unwrap();
2307        sleep(Duration::from_secs(3)).await;
2308
2309        poll_until_condition(
2310            async || {
2311                rpc.mine_blocks(1).await.unwrap();
2312                Ok(rpc.is_tx_on_chain(&movetx_txid).await.unwrap_or_default())
2313            },
2314            None,
2315            None,
2316        )
2317        .await
2318        .wrap_err_with(|| eyre::eyre!("MoveTx did not land onchain"))
2319        .unwrap();
2320    }
2321
2322    #[tokio::test]
2323    async fn aggregator_two_deposit_movetx_and_emergency_stop() {
2324        let mut config = create_test_config_with_thread_name().await;
2325        let regtest = create_regtest_rpc(&mut config).await;
2326        let rpc = regtest.rpc();
2327        let actors = create_actors::<MockCitreaClient>(&config).await;
2328        let mut aggregator = actors.get_aggregator();
2329
2330        let evm_address = EVMAddress([1u8; 20]);
2331        let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2332
2333        let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2334            .setup(tonic::Request::new(clementine::Empty {}))
2335            .await
2336            .unwrap()
2337            .into_inner()
2338            .try_into()
2339            .unwrap();
2340        sleep(Duration::from_secs(3)).await;
2341
2342        let nofn_xonly_pk =
2343            bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2344
2345        let deposit_address_0 = builder::address::generate_deposit_address(
2346            nofn_xonly_pk,
2347            signer.address.as_unchecked(),
2348            evm_address,
2349            config.protocol_paramset().network,
2350            config.protocol_paramset().user_takes_after,
2351        )
2352        .unwrap()
2353        .0;
2354
2355        let deposit_address_1 = builder::address::generate_deposit_address(
2356            nofn_xonly_pk,
2357            signer.address.as_unchecked(),
2358            evm_address,
2359            config.protocol_paramset().network,
2360            config.protocol_paramset().user_takes_after,
2361        )
2362        .unwrap()
2363        .0;
2364
2365        let deposit_outpoint_0 = rpc
2366            .send_to_address(&deposit_address_0, config.protocol_paramset().bridge_amount)
2367            .await
2368            .unwrap();
2369        rpc.mine_blocks(18).await.unwrap();
2370
2371        let deposit_outpoint_1 = rpc
2372            .send_to_address(&deposit_address_1, config.protocol_paramset().bridge_amount)
2373            .await
2374            .unwrap();
2375        rpc.mine_blocks(18).await.unwrap();
2376
2377        let deposit_info_0 = DepositInfo {
2378            deposit_outpoint: deposit_outpoint_0,
2379            deposit_type: DepositType::BaseDeposit(BaseDepositData {
2380                evm_address,
2381                recovery_taproot_address: signer.address.as_unchecked().clone(),
2382            }),
2383        };
2384
2385        let deposit_info_1 = DepositInfo {
2386            deposit_outpoint: deposit_outpoint_1,
2387            deposit_type: DepositType::BaseDeposit(BaseDepositData {
2388                evm_address,
2389                recovery_taproot_address: signer.address.as_unchecked().clone(),
2390            }),
2391        };
2392
2393        // Generate and broadcast the move-to-vault tx for the first deposit
2394        let raw_move_tx_0 = aggregator
2395            .new_deposit(clementine::Deposit::from(deposit_info_0))
2396            .await
2397            .unwrap()
2398            .into_inner();
2399        let move_txid_0: bitcoin::Txid = aggregator
2400            .send_move_to_vault_tx(SendMoveTxRequest {
2401                deposit_outpoint: Some(deposit_outpoint_0.into()),
2402                raw_tx: Some(raw_move_tx_0),
2403            })
2404            .await
2405            .unwrap()
2406            .into_inner()
2407            .try_into()
2408            .unwrap();
2409
2410        rpc.mine_blocks(1).await.unwrap();
2411        sleep(Duration::from_secs(3)).await;
2412        ensure_tx_onchain(rpc, move_txid_0)
2413            .await
2414            .expect("failed to get movetx_0 on chain");
2415
2416        // Generate and broadcast the move-to-vault tx for the second deposit
2417        let raw_move_tx_1 = aggregator
2418            .new_deposit(clementine::Deposit::from(deposit_info_1))
2419            .await
2420            .unwrap()
2421            .into_inner();
2422        let move_txid_1 = aggregator
2423            .send_move_to_vault_tx(SendMoveTxRequest {
2424                deposit_outpoint: Some(deposit_outpoint_1.into()),
2425                raw_tx: Some(raw_move_tx_1),
2426            })
2427            .await
2428            .unwrap()
2429            .into_inner()
2430            .try_into()
2431            .unwrap();
2432
2433        rpc.mine_blocks(1).await.unwrap();
2434        ensure_tx_onchain(rpc, move_txid_1)
2435            .await
2436            .expect("failed to get movetx_1 on chain");
2437        sleep(Duration::from_secs(3)).await;
2438
2439        let move_txids = vec![move_txid_0, move_txid_1];
2440
2441        tracing::debug!("Move txids: {:?}", move_txids);
2442
2443        let emergency_txid = aggregator
2444            .internal_get_emergency_stop_tx(tonic::Request::new(
2445                clementine::GetEmergencyStopTxRequest {
2446                    txids: move_txids
2447                        .iter()
2448                        .map(|txid| clementine::Txid {
2449                            txid: txid.to_byte_array().to_vec(),
2450                        })
2451                        .collect(),
2452                },
2453            ))
2454            .await
2455            .unwrap()
2456            .into_inner();
2457
2458        let decryption_priv_key =
2459            hex::decode("a80bc8cf095c2b37d4c6233114e0dd91f43d75de5602466232dbfcc1fc66c542")
2460                .expect("Failed to parse emergency stop encryption public key");
2461        let emergency_stop_tx: bitcoin::Transaction = bitcoin::consensus::deserialize(
2462            &crate::encryption::decrypt_bytes(
2463                &decryption_priv_key,
2464                &emergency_txid.encrypted_emergency_stop_txs[0],
2465            )
2466            .expect("Failed to decrypt emergency stop tx"),
2467        )
2468        .expect("Failed to deserialize");
2469
2470        rpc.send_raw_transaction(&emergency_stop_tx)
2471            .await
2472            .expect("Failed to send emergency stop tx");
2473
2474        let emergency_stop_txid = emergency_stop_tx.compute_txid();
2475        rpc.mine_blocks(1).await.unwrap();
2476
2477        poll_until_condition(
2478            async || {
2479                rpc.mine_blocks(1).await.unwrap();
2480                Ok(rpc
2481                    .is_tx_on_chain(&emergency_stop_txid)
2482                    .await
2483                    .unwrap_or_default())
2484            },
2485            None,
2486            None,
2487        )
2488        .await
2489        .wrap_err_with(|| eyre::eyre!("Emergency stop tx did not land onchain"))
2490        .unwrap();
2491    }
2492
2493    #[cfg(feature = "automation")]
2494    #[tokio::test]
2495    #[ignore = "This test does not work"]
2496    async fn aggregator_deposit_finalize_verifier_timeout() {
2497        let mut config = create_test_config_with_thread_name().await;
2498        config
2499            .test_params
2500            .timeout_params
2501            .deposit_finalize_verifier_idx = Some(0);
2502        let res = perform_deposit(config).await;
2503        assert!(res.is_err());
2504        let err_string = res.unwrap_err().to_string();
2505        assert!(
2506            err_string.contains("Deposit finalization from verifiers"),
2507            "Error string was: {err_string}"
2508        );
2509    }
2510
2511    #[cfg(feature = "automation")]
2512    #[tokio::test]
2513    async fn aggregator_deposit_key_distribution_verifier_timeout() {
2514        let mut config = create_test_config_with_thread_name().await;
2515        config
2516            .test_params
2517            .timeout_params
2518            .key_distribution_verifier_idx = Some(0);
2519
2520        let res = perform_deposit(config).await;
2521
2522        assert!(res.is_err());
2523        let err_string = res.unwrap_err().to_string();
2524        assert!(
2525            err_string.contains("Verifier key distribution (id:"),
2526            "Error string was: {err_string}"
2527        );
2528    }
2529
2530    #[cfg(feature = "automation")]
2531    #[tokio::test]
2532    async fn aggregator_deposit_key_distribution_operator_timeout() {
2533        let mut config = create_test_config_with_thread_name().await;
2534        config
2535            .test_params
2536            .timeout_params
2537            .key_collection_operator_idx = Some(0);
2538
2539        let res = perform_deposit(config).await;
2540
2541        assert!(res.is_err());
2542        let err_string = res.unwrap_err().to_string();
2543        assert!(
2544            err_string.contains("Operator key collection (id:"),
2545            "Error string was: {err_string}"
2546        );
2547    }
2548
2549    #[cfg(feature = "automation")]
2550    #[tokio::test]
2551    async fn aggregator_deposit_nonce_stream_creation_verifier_timeout() {
2552        let mut config = create_test_config_with_thread_name().await;
2553        config
2554            .test_params
2555            .timeout_params
2556            .nonce_stream_creation_verifier_idx = Some(0);
2557
2558        let res = perform_deposit(config).await;
2559
2560        assert!(res.is_err());
2561        let err_string = res.unwrap_err().to_string();
2562        assert!(
2563            err_string.contains("Nonce stream creation (id:"),
2564            "Error string was: {err_string}"
2565        );
2566    }
2567
2568    #[cfg(feature = "automation")]
2569    #[tokio::test]
2570    async fn aggregator_deposit_partial_sig_stream_creation_timeout() {
2571        let mut config = create_test_config_with_thread_name().await;
2572        config
2573            .test_params
2574            .timeout_params
2575            .partial_sig_stream_creation_verifier_idx = Some(0);
2576
2577        let res = perform_deposit(config).await;
2578
2579        assert!(res.is_err());
2580        let err_string = res.unwrap_err().to_string();
2581        assert!(
2582            err_string.contains("Partial signature stream creation (id:"),
2583            "Error string was: {err_string}"
2584        );
2585    }
2586
2587    #[cfg(feature = "automation")]
2588    #[tokio::test]
2589    async fn aggregator_deposit_operator_sig_collection_operator_timeout() {
2590        let mut config = create_test_config_with_thread_name().await;
2591        config
2592            .test_params
2593            .timeout_params
2594            .operator_sig_collection_operator_idx = Some(0);
2595
2596        let res = perform_deposit(config).await;
2597
2598        assert!(res.is_err());
2599        let err_string = res.unwrap_err().to_string();
2600        assert!(
2601            err_string.contains("Operator signature stream creation (id:"),
2602            "Error string was: {err_string}"
2603        );
2604    }
2605
2606    #[tokio::test]
2607    async fn aggregator_get_entity_statuses() {
2608        let mut config = create_test_config_with_thread_name().await;
2609        let _regtest = create_regtest_rpc(&mut config).await;
2610
2611        let actors = create_actors::<MockCitreaClient>(&config).await;
2612        let mut aggregator = actors.get_aggregator();
2613        let status = aggregator
2614            .get_entity_statuses(Request::new(GetEntityStatusesRequest {
2615                restart_tasks: false,
2616            }))
2617            .await
2618            .unwrap()
2619            .into_inner();
2620
2621        tracing::info!("Status: {:?}", status);
2622
2623        assert_eq!(
2624            status.entity_statuses.len(),
2625            config.test_params.all_operators_secret_keys.len()
2626                + config.test_params.all_verifiers_secret_keys.len()
2627        );
2628    }
2629
2630    #[tokio::test]
2631    async fn aggregator_start_with_offline_verifier() {
2632        let mut config = create_test_config_with_thread_name().await;
2633        // Create regtest rpc
2634        let _regtest = create_regtest_rpc(&mut config).await;
2635        // random ips
2636        config.verifier_endpoints = Some(vec!["https://142.143.144.145:17001".to_string()]);
2637        config.operator_endpoints = Some(vec!["https://142.143.144.145:17002".to_string()]);
2638        // Create temporary directory for aggregator socket
2639        let socket_dir = tempfile::tempdir().unwrap();
2640        let socket_path = socket_dir.path().join("aggregator.sock");
2641
2642        tracing::info!("Creating unix aggregator server");
2643
2644        let (_, _shutdown_tx) = create_aggregator_unix_server(config.clone(), socket_path.clone())
2645            .await
2646            .unwrap();
2647
2648        tracing::info!("Created unix aggregator server");
2649
2650        let mut aggregator_client = get_clients(
2651            vec![format!("unix://{}", socket_path.display())],
2652            ClementineAggregatorClient::new,
2653            &config,
2654            false,
2655        )
2656        .await
2657        .unwrap()
2658        .pop()
2659        .unwrap();
2660
2661        tracing::info!("Got aggregator client");
2662
2663        // vergen should work
2664        assert!(aggregator_client
2665            .vergen(Request::new(clementine::Empty {}))
2666            .await
2667            .is_ok());
2668
2669        tracing::info!("After vergen");
2670
2671        // setup should give error as it can't connect to the verifier
2672        assert!(aggregator_client
2673            .setup(Request::new(clementine::Empty {}))
2674            .await
2675            .is_err());
2676
2677        tracing::info!("After setup");
2678
2679        // aggregator should still be up even after not connecting to the verifier
2680        // and should be able to get metrics
2681        tracing::info!(
2682            "Entity statuses: {:?}",
2683            aggregator_client
2684                .get_entity_statuses(Request::new(GetEntityStatusesRequest {
2685                    restart_tasks: false,
2686                }))
2687                .await
2688                .unwrap()
2689        );
2690    }
2691}