clementine_core/rpc/
aggregator.rs

1use super::clementine::{
2    clementine_aggregator_server::ClementineAggregator, verifier_deposit_finalize_params,
3    DepositParams, Empty, VerifierDepositFinalizeParams,
4};
5use super::clementine::{
6    AggregatorWithdrawResponse, Deposit, EntityStatuses, GetEntityStatusesRequest,
7    OptimisticPayoutParams, RawSignedTx, VergenResponse, VerifierPublicKeys,
8};
9use crate::aggregator::{
10    AggregatorServer, OperatorId, ParticipatingOperators, ParticipatingVerifiers, VerifierId,
11};
12use crate::bitvm_client::SECP;
13use crate::builder::sighash::SignatureInfo;
14use crate::builder::transaction::{
15    create_emergency_stop_txhandler, create_move_to_vault_txhandler,
16    create_optimistic_payout_txhandler, Signed, TransactionType, TxHandler,
17};
18use crate::compatibility::ActorWithConfig;
19use crate::config::BridgeConfig;
20use crate::constants::{
21    DEPOSIT_FINALIZATION_TIMEOUT, DEPOSIT_FINALIZE_STREAM_CREATION_TIMEOUT,
22    KEY_DISTRIBUTION_TIMEOUT, NONCE_STREAM_CREATION_TIMEOUT, OPERATOR_SIGS_STREAM_CREATION_TIMEOUT,
23    OPERATOR_SIGS_TIMEOUT, OPTIMISTIC_PAYOUT_TIMEOUT, OVERALL_DEPOSIT_TIMEOUT,
24    PARTIAL_SIG_STREAM_CREATION_TIMEOUT, PIPELINE_COMPLETION_TIMEOUT, SEND_OPERATOR_SIGS_TIMEOUT,
25    SETUP_COMPLETION_TIMEOUT, WITHDRAWAL_TIMEOUT,
26};
27use crate::deposit::{Actors, DepositData, DepositInfo};
28use crate::errors::{ErrorExt, ResultExt};
29use crate::musig2::AggregateFromPublicKeys;
30use crate::rpc::clementine::{
31    operator_withrawal_response, AggregatorWithdrawalInput, CompatibilityParamsRpc,
32    EntitiesCompatibilityData, OperatorWithrawalResponse, VerifierDepositSignParams,
33};
34use crate::rpc::parser;
35use crate::utils::{
36    flatten_join_named_results, get_vergen_response, join_all_combine_errors, timed_request,
37    timed_try_join_all, ScriptBufExt,
38};
39use crate::utils::{FeePayingType, TxMetadata};
40use crate::UTXO;
41use crate::{
42    aggregator::Aggregator,
43    builder::sighash::create_nofn_sighash_stream,
44    errors::BridgeError,
45    musig2::aggregate_nonces,
46    rpc::clementine::{self, DepositSignSession},
47};
48use bitcoin::hashes::Hash;
49use bitcoin::secp256k1::schnorr::Signature;
50use bitcoin::secp256k1::{Message, PublicKey};
51use bitcoin::{TapSighash, TxOut, Txid, XOnlyPublicKey};
52use eyre::{Context, OptionExt};
53use futures::future::join_all;
54use futures::{
55    stream::{BoxStream, TryStreamExt},
56    FutureExt, Stream, StreamExt, TryFutureExt,
57};
58use secp256k1::musig::{AggregatedNonce, PartialSignature, PublicNonce};
59use std::future::Future;
60use tokio::sync::mpsc::{channel, Receiver, Sender};
61use tonic::{async_trait, Request, Response, Status, Streaming};
62
63struct AggNonceQueueItem {
64    agg_nonce: AggregatedNonce,
65    sighash: TapSighash,
66}
67
68#[derive(Debug, Clone)]
69struct FinalSigQueueItem {
70    final_sig: Vec<u8>,
71}
72
73#[derive(Debug, thiserror::Error)]
74pub enum AggregatorError {
75    #[error("Failed to receive from {stream_name} stream.")]
76    InputStreamEndedEarlyUnknownSize { stream_name: String },
77    #[error("Failed to send to {stream_name} stream.")]
78    OutputStreamEndedEarly { stream_name: String },
79    #[error("Failed to send request to {request_name} stream.")]
80    RequestFailed { request_name: String },
81}
82
83async fn get_next_pub_nonces(
84    nonce_streams: &mut [impl Stream<Item = Result<PublicNonce, BridgeError>>
85              + Unpin
86              + Send
87              + 'static],
88    verifiers_ids: &[VerifierId],
89) -> Result<Vec<PublicNonce>, BridgeError> {
90    join_all_combine_errors(
91        nonce_streams
92            .iter_mut()
93            .zip(verifiers_ids)
94            .map(|(s, id)| async move {
95                s.next()
96                    .await
97                    .transpose()
98                    .wrap_err(format!("Failed to get nonce from {id}"))? // Return the inner error if it exists
99                    .ok_or_else(|| -> eyre::Report {
100                        AggregatorError::InputStreamEndedEarlyUnknownSize {
101                            // Return an early end error if the stream is empty
102                            stream_name: format!("Nonce stream {id}"),
103                        }
104                        .into()
105                    })
106            }),
107    )
108    .await
109}
110
111/// For each expected sighash, we collect a batch of public nonces from all verifiers. We aggregate and send aggregated nonce and all public nonces (needed for partial signature verification) to the agg_nonce_sender. Then repeat for the next sighash.
112async fn nonce_aggregator(
113    mut nonce_streams: Vec<
114        impl Stream<Item = Result<PublicNonce, BridgeError>> + Unpin + Send + 'static,
115    >,
116    mut sighash_stream: impl Stream<Item = Result<(TapSighash, SignatureInfo), BridgeError>>
117        + Unpin
118        + Send
119        + 'static,
120    agg_nonce_sender: Sender<(AggNonceQueueItem, Vec<PublicNonce>)>,
121    needed_nofn_sigs: usize,
122    verifiers_ids: Vec<VerifierId>,
123) -> Result<
124    (
125        (AggregatedNonce, Vec<PublicNonce>),
126        (AggregatedNonce, Vec<PublicNonce>),
127    ),
128    BridgeError,
129> {
130    let mut total_sigs = 0;
131
132    tracing::info!("Starting nonce aggregation (expecting {needed_nofn_sigs} nonces)");
133
134    // sanity check
135    if verifiers_ids.len() != nonce_streams.len() {
136        return Err(
137            eyre::eyre!("Number of verifiers ids and nonce streams must be the same").into(),
138        );
139    }
140
141    while let Some(msg) = sighash_stream.next().await {
142        let (sighash, siginfo) = msg.wrap_err("Sighash stream failed")?;
143
144        total_sigs += 1;
145
146        let pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
147            .await
148            .wrap_err_with(|| {
149                format!("Failed to get nonces from verifiers for sighash #{total_sigs} with siginfo: {siginfo:?}")
150            })?;
151
152        tracing::trace!(
153            "Received nonces for signature id {:?} in nonce_aggregator",
154            siginfo.signature_id
155        );
156
157        let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::<Vec<_>>().as_slice())?;
158
159        agg_nonce_sender
160            .send((AggNonceQueueItem { agg_nonce, sighash }, pub_nonces))
161            .await
162            .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
163                stream_name: "nonce_aggregator".to_string(),
164            })?;
165
166        tracing::trace!(
167            "Sent nonces for signature id {:?} in nonce_aggregator",
168            siginfo.signature_id
169        );
170    }
171    tracing::trace!(tmp_debug = 1, "Sent {total_sigs} to agg_nonce stream");
172
173    // Sanity check, should never happen
174    if total_sigs != needed_nofn_sigs {
175        let err_msg = format!(
176            "Expected {needed_nofn_sigs} nofn signatures, got {total_sigs} from sighash stream",
177        );
178        tracing::error!("{err_msg}");
179        return Err(eyre::eyre!(err_msg).into());
180    }
181    // aggregate nonces for the movetx signature
182    let movetx_pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
183        .await
184        .wrap_err("Failed to get movetx public nonces from verifiers")?;
185
186    tracing::trace!("Received nonces for movetx in nonce_aggregator");
187
188    let move_tx_agg_nonce =
189        aggregate_nonces(movetx_pub_nonces.iter().collect::<Vec<_>>().as_slice())
190            .wrap_err("Failed to aggregate movetx nonces")?;
191
192    let emergency_stop_pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
193        .await
194        .wrap_err("Failed to get emergency stop tx public nonces from verifiers")?;
195
196    let emergency_stop_agg_nonce = aggregate_nonces(
197        emergency_stop_pub_nonces
198            .iter()
199            .collect::<Vec<_>>()
200            .as_slice(),
201    )
202    .wrap_err("Failed to aggregate emergency stop tx nonces")?;
203
204    Ok((
205        (move_tx_agg_nonce, movetx_pub_nonces),
206        (emergency_stop_agg_nonce, emergency_stop_pub_nonces),
207    ))
208}
209
210/// Reroutes aggregated nonces and public nonces for each aggregated nonce to the signature aggregator.
211async fn nonce_distributor(
212    mut agg_nonce_receiver: Receiver<(AggNonceQueueItem, Vec<PublicNonce>)>,
213    partial_sig_streams: Vec<(
214        Streaming<clementine::PartialSig>,
215        Sender<clementine::VerifierDepositSignParams>,
216    )>,
217    partial_sig_sender: Sender<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>,
218    needed_nofn_sigs: usize,
219    verifiers_ids: Vec<VerifierId>,
220) -> Result<(), BridgeError> {
221    let mut nonce_count = 0;
222    let mut sig_count = 0;
223    let (mut partial_sig_rx, mut partial_sig_tx): (Vec<_>, Vec<_>) =
224        partial_sig_streams.into_iter().unzip();
225
226    let (queue_tx, mut queue_rx) = channel(crate::constants::DEFAULT_CHANNEL_SIZE);
227
228    // sanity check
229    if verifiers_ids.len() != partial_sig_rx.len() {
230        return Err(eyre::eyre!(
231            "Number of verifiers ids and partial sig streams must be the same"
232        )
233        .into());
234    }
235    let verifiers_ids_clone = verifiers_ids.clone();
236    let handle_1 = tokio::spawn(async move {
237        while let Some((queue_item, pub_nonces)) = agg_nonce_receiver.recv().await {
238            nonce_count += 1;
239
240            tracing::trace!(
241                "Received aggregated nonce {} in nonce_distributor",
242                nonce_count
243            );
244
245            let agg_nonce_wrapped = clementine::VerifierDepositSignParams {
246                params: Some(clementine::verifier_deposit_sign_params::Params::AggNonce(
247                    queue_item.agg_nonce.serialize().to_vec(),
248                )),
249            };
250
251            // Broadcast aggregated nonce to all streams
252            join_all_combine_errors(
253                partial_sig_tx
254                    .iter_mut()
255                    .zip(verifiers_ids_clone.iter())
256                    .map(|(tx, id)| {
257                        let agg_nonce_wrapped = agg_nonce_wrapped.clone();
258                        async move {
259                            tx.send(agg_nonce_wrapped)
260                                .await
261                                .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
262                                    stream_name: format!("Partial sig {id}"),
263                                })
264                                .inspect_err(|e| {
265                                    tracing::error!(
266                                        "Failed to send aggregated nonce to {id}: {:?}",
267                                        e
268                                    );
269                                })
270                        }
271                    }),
272            )
273            .await
274            .wrap_err("Failed to send aggregated nonces to verifiers")?;
275
276            queue_tx
277                .send((queue_item, pub_nonces))
278                .await
279                .wrap_err("Other end of channel closed")?;
280
281            tracing::trace!(
282                "Sent aggregated nonce {} to verifiers in nonce_distributor",
283                nonce_count
284            );
285            if nonce_count == needed_nofn_sigs {
286                break;
287            }
288        }
289        if nonce_count != needed_nofn_sigs {
290            let err_msg = format!("Expected {needed_nofn_sigs} aggregated nonces in nonce_distributor, got {nonce_count}",);
291            tracing::error!("{err_msg}");
292            return Err(eyre::eyre!(err_msg).into());
293        }
294
295        tracing::trace!(
296            tmp_debug = 1,
297            "Broadcasted {nonce_count} agg_nonces to verifiers and to the queue"
298        );
299        Ok::<(), BridgeError>(())
300    });
301
302    let handle_2 = tokio::spawn(async move {
303        while let Some((queue_item, pub_nonces)) = queue_rx.recv().await {
304            let pub_nonces_ref = pub_nonces.as_slice();
305            if pub_nonces_ref.len() != partial_sig_rx.len() {
306                return Err(eyre::eyre!(
307                    "Number of public nonces {} and partial sig streams {} must be the same",
308                    pub_nonces_ref.len(),
309                    partial_sig_rx.len()
310                )
311                .into());
312            }
313            let partial_sigs = join_all_combine_errors(partial_sig_rx.iter_mut().zip(pub_nonces_ref.iter()).zip(verifiers_ids.iter()).map(
314                |((stream, pub_nonce), id)| async move {
315                    let partial_sig = stream
316                        .message()
317                        .await
318                        .wrap_err_with(|| AggregatorError::RequestFailed {
319                            request_name: format!("Partial sig {sig_count} from {id}"),
320                        })
321                        .inspect_err(|e| {
322                            tracing::error!(
323                                "Failed to receive partial signature {sig_count} from {id}, an error was sent: {:?}",
324                                e
325                            );
326                        })?
327                        .ok_or_eyre(AggregatorError::InputStreamEndedEarlyUnknownSize {
328                            stream_name: format!("Partial sig {sig_count} from {id} closed"),
329                        }).inspect_err(|e| {
330                            tracing::error!(
331                                "Failed to receive partial signature {sig_count} from {id}, the stream was closed: {:?}",
332                                e
333                            );
334                        })?;
335                    let partial_sig = PartialSignature::from_byte_array(
336                        &partial_sig
337                            .partial_sig
338                            .as_slice()
339                            .try_into()
340                            .wrap_err("PartialSignature must be 32 bytes")?,
341                    )
342                    .wrap_err(format!("Failed to parse partial signature {sig_count} from {id}"))?;
343
344                    Ok::<_, BridgeError>((partial_sig, *pub_nonce))
345                },
346            ))
347            .await?;
348
349            sig_count += 1;
350
351            tracing::trace!(
352                "Received partial signature {} from verifiers in nonce_distributor",
353                sig_count
354            );
355
356            partial_sig_sender
357                .send((partial_sigs, queue_item))
358                .await
359                .map_err(|_| {
360                    eyre::eyre!(AggregatorError::OutputStreamEndedEarly {
361                        stream_name: "partial_sig_sender".into(),
362                    })
363                })?;
364
365            tracing::trace!(
366                "Sent partial signature {} to signature_aggregator in nonce_distributor",
367                sig_count
368            );
369        }
370
371        if sig_count != needed_nofn_sigs {
372            let err_msg = format!(
373                "Expected {needed_nofn_sigs} partial signatures in nonce_distributor, got {sig_count}",
374            );
375            tracing::error!("{err_msg}");
376            return Err(eyre::eyre!(err_msg).into());
377        }
378        tracing::trace!(
379            tmp_debug = 1,
380            "Sent {sig_count} partial sig bundles to partial_sigs stream"
381        );
382
383        tracing::trace!("Finished tasks in nonce_distributor handle 2");
384        Ok::<(), BridgeError>(())
385    });
386
387    let (result_1, result_2) = tokio::join!(handle_1, handle_2);
388
389    let mut task_errors = Vec::new();
390
391    match result_1 {
392        Ok(inner_result) => {
393            if let Err(e) = inner_result {
394                task_errors.push(format!(
395                    "Task returned error while distributing aggnonces: {e:#?}"
396                ));
397            }
398        }
399        Err(e) => {
400            task_errors.push(format!(
401                "Task panicked while distributing aggnonces: {e:#?}"
402            ));
403        }
404    }
405
406    match result_2 {
407        Ok(inner_result) => {
408            if let Err(e) = inner_result {
409                task_errors.push(format!(
410                    "Task returned error while receiving partial sigs: {e:#?}"
411                ));
412            }
413        }
414        Err(e) => {
415            task_errors.push(format!(
416                "Task panicked while receiving partial sigs: {e:#?}"
417            ));
418        }
419    }
420
421    if !task_errors.is_empty() {
422        return Err(eyre::eyre!(format!(
423            "nonce_distributor failed with errors: {:#?}",
424            task_errors
425        ))
426        .into());
427    }
428
429    tracing::debug!("Finished tasks in nonce_distributor");
430
431    Ok(())
432}
433
434/// Collects partial signatures and corresponding public nonces from given stream and aggregates them.
435/// Each partial signature will also be verified if PARTIAL_SIG_VERIFICATION is set to true.
436async fn signature_aggregator(
437    mut partial_sig_receiver: Receiver<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>,
438    verifiers_public_keys: Vec<PublicKey>,
439    final_sig_sender: Sender<FinalSigQueueItem>,
440    needed_nofn_sigs: usize,
441) -> Result<(), BridgeError> {
442    let mut sig_count = 0;
443    while let Some((partial_sigs, queue_item)) = partial_sig_receiver.recv().await {
444        sig_count += 1;
445        tracing::trace!(
446            "Received partial signatures {} in signature_aggregator",
447            sig_count
448        );
449
450        let final_sig = crate::musig2::aggregate_partial_signatures(
451            verifiers_public_keys.clone(),
452            None,
453            queue_item.agg_nonce,
454            &partial_sigs,
455            Message::from_digest(queue_item.sighash.to_byte_array()),
456        )?;
457
458        final_sig_sender
459            .send(FinalSigQueueItem {
460                final_sig: final_sig.serialize().to_vec(),
461            })
462            .await
463            .wrap_err_with(|| {
464                eyre::eyre!(AggregatorError::OutputStreamEndedEarly {
465                    stream_name: "final_sig_sender".into(),
466                })
467            })?;
468        tracing::trace!(
469            "Sent aggregated signature {} to signature_distributor in signature_aggregator",
470            sig_count
471        );
472
473        if sig_count == needed_nofn_sigs {
474            break;
475        }
476    }
477
478    if sig_count != needed_nofn_sigs {
479        let err_msg = format!(
480            "Expected {needed_nofn_sigs} aggregated signatures in signature_aggregator, got {sig_count}",
481        );
482        tracing::error!("{err_msg}");
483        return Err(eyre::eyre!(err_msg).into());
484    }
485
486    tracing::trace!(
487        tmp_debug = 1,
488        "Sent {sig_count} aggregated signatures to final_sig stream"
489    );
490
491    Ok(())
492}
493
494/// Reroutes aggregated signatures to the caller.
495/// Also sends 2 aggregated nonces to the verifiers.
496async fn signature_distributor(
497    mut final_sig_receiver: Receiver<FinalSigQueueItem>,
498    deposit_finalize_sender: Vec<Sender<VerifierDepositFinalizeParams>>,
499    agg_nonce: impl Future<
500        Output = Result<
501            (
502                (AggregatedNonce, Vec<PublicNonce>),
503                (AggregatedNonce, Vec<PublicNonce>),
504            ),
505            Status,
506        >,
507    >,
508    needed_nofn_sigs: usize,
509    verifiers_ids: Vec<VerifierId>,
510) -> Result<(), BridgeError> {
511    use verifier_deposit_finalize_params::Params;
512    let mut sig_count = 0;
513    while let Some(queue_item) = final_sig_receiver.recv().await {
514        sig_count += 1;
515        tracing::trace!("Received signature {} in signature_distributor", sig_count);
516        let final_params = VerifierDepositFinalizeParams {
517            params: Some(Params::SchnorrSig(queue_item.final_sig)),
518        };
519
520        join_all_combine_errors(
521            deposit_finalize_sender
522                .iter()
523                .zip(verifiers_ids.iter())
524                .map(|(tx, id)| {
525                    let final_params = final_params.clone();
526                    async move {
527                        tx.send(final_params).await.wrap_err_with(|| {
528                            AggregatorError::OutputStreamEndedEarly {
529                                stream_name: format!("Deposit finalize sender for {id}"),
530                            }
531                        })
532                    }
533                }),
534        )
535        .await
536        .wrap_err(format!(
537            "Failed to send final signature {sig_count} to verifiers"
538        ))?;
539
540        tracing::trace!(
541            "Sent signature {} to verifiers in signature_distributor",
542            sig_count
543        );
544
545        if sig_count == needed_nofn_sigs {
546            break;
547        }
548    }
549
550    if sig_count != needed_nofn_sigs {
551        let err_msg = format!(
552            "Expected {needed_nofn_sigs} signatures in signature_distributor, got {sig_count}",
553        );
554        tracing::error!("{err_msg}");
555        return Err(eyre::eyre!(err_msg).into());
556    }
557
558    tracing::trace!(
559        tmp_debug = 1,
560        "Sent {sig_count} signatures to verifiers in deposit_finalize"
561    );
562
563    let (movetx_agg_nonce, emergency_stop_agg_nonce) = agg_nonce
564        .await
565        .wrap_err("Failed to get aggregated nonce for movetx and emergency stop")?;
566
567    tracing::info!("Got aggregated nonce for movetx and emergency stop in signature distributor");
568
569    // Send the movetx agg nonce to the verifiers.
570    for tx in &deposit_finalize_sender {
571        tx.send(VerifierDepositFinalizeParams {
572            params: Some(Params::MoveTxAggNonce(
573                movetx_agg_nonce.0.serialize().to_vec(),
574            )),
575        })
576        .await
577        .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
578            stream_name: "Deposit finalize sender (for movetx agg nonce)".to_string(),
579        })?;
580    }
581    tracing::info!("Sent movetx aggregated nonce to verifiers in signature distributor");
582
583    // send emergency stop agg nonce to verifiers
584    for tx in &deposit_finalize_sender {
585        tx.send(VerifierDepositFinalizeParams {
586            params: Some(Params::EmergencyStopAggNonce(
587                emergency_stop_agg_nonce.0.serialize().to_vec(),
588            )),
589        })
590        .await
591        .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
592            stream_name: "Deposit finalize sender (for emergency stop agg nonce)".to_string(),
593        })?;
594    }
595    tracing::info!("Sent emergency stop aggregated nonce to verifiers in signature distributor");
596
597    Ok(())
598}
599
600/// Creates a stream of nonces from verifiers.
601/// This will automatically get the first response from the verifiers.
602///
603/// # Returns
604///
605/// - Vec<[`clementine::NonceGenFirstResponse`]>: First response from each verifier
606/// - Vec<BoxStream<Result<[`PublicNonce`], BridgeError>>>: Stream of nonces from each verifier
607async fn create_nonce_streams(
608    verifiers: ParticipatingVerifiers,
609    num_nonces: u32,
610    #[cfg(test)] config: &crate::config::BridgeConfig,
611) -> Result<
612    (
613        Vec<clementine::NonceGenFirstResponse>,
614        Vec<BoxStream<'static, Result<PublicNonce, BridgeError>>>,
615    ),
616    BridgeError,
617> {
618    let mut nonce_streams = timed_try_join_all(
619        NONCE_STREAM_CREATION_TIMEOUT,
620        "Nonce stream creation",
621        Some(verifiers.ids()),
622        verifiers
623            .clients()
624            .into_iter()
625            .enumerate()
626            .map(|(idx, client)| {
627                let mut client = client.clone();
628                #[cfg(test)]
629                let config = config.clone();
630
631                async move {
632                    #[cfg(test)]
633                    config
634                        .test_params
635                        .timeout_params
636                        .hook_timeout_nonce_stream_creation_verifier(idx)
637                        .await;
638                    let response_stream = client
639                        .nonce_gen(tonic::Request::new(clementine::NonceGenRequest {
640                            num_nonces,
641                        }))
642                        .await
643                        .wrap_err_with(|| AggregatorError::RequestFailed {
644                            request_name: format!("Nonce gen stream for verifier {idx}"),
645                        })?;
646
647                    Ok::<_, BridgeError>(response_stream.into_inner())
648                }
649            }),
650    )
651    .await?;
652
653    // Get the first responses from verifiers.
654    let first_responses: Vec<clementine::NonceGenFirstResponse> =
655        join_all_combine_errors(nonce_streams.iter_mut().zip(verifiers.ids()).map(
656            |(stream, id)| async move {
657                parser::verifier::parse_nonce_gen_first_response(stream)
658                    .await
659                    .wrap_err_with(|| format!("Failed to get initial response from {id}"))
660            },
661        ))
662        .await
663        .wrap_err("Failed to get nonce gen's initial responses from verifiers")?;
664
665    let transformed_streams = nonce_streams
666        .into_iter()
667        .zip(verifiers.ids())
668        .map(|(stream, id)| {
669            stream
670                .map(move |result| {
671                    Aggregator::extract_pub_nonce(
672                        result
673                            .wrap_err_with(|| AggregatorError::InputStreamEndedEarlyUnknownSize {
674                                stream_name: format!("Nonce gen stream for {id}"),
675                            })?
676                            .response,
677                    )
678                })
679                .boxed()
680        })
681        .collect::<Vec<_>>();
682
683    Ok((first_responses, transformed_streams))
684}
685
686/// Use items collected from the broadcast receiver for an async function call.
687///
688/// Handles the boilerplate of managing a receiver of a broadcast channel.
689/// If receiver is lagged at any time (data is lost) an error is returned.
690async fn collect_and_call<R, T, F, Fut>(
691    rx: &mut tokio::sync::broadcast::Receiver<Vec<T>>,
692    mut f: F,
693) -> Result<R, Status>
694where
695    R: Default,
696    T: Clone,
697    F: FnMut(Vec<T>) -> Fut,
698    Fut: Future<Output = Result<R, Status>>,
699{
700    loop {
701        match rx.recv().await {
702            Ok(params) => {
703                f(params).await?;
704            }
705            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
706                break Err(Status::internal(format!(
707                    "lost {n} items due to lagging receiver"
708                )));
709            }
710            Err(tokio::sync::broadcast::error::RecvError::Closed) => break Ok(R::default()),
711        }
712    }
713}
714
715impl Aggregator {
716    // Extracts pub_nonce from given stream.
717    fn extract_pub_nonce(
718        response: Option<clementine::nonce_gen_response::Response>,
719    ) -> Result<PublicNonce, BridgeError> {
720        match response.ok_or_eyre("NonceGen response is empty")? {
721            clementine::nonce_gen_response::Response::PubNonce(pub_nonce) => {
722                Ok(PublicNonce::from_byte_array(
723                    &pub_nonce
724                        .as_slice()
725                        .try_into()
726                        .wrap_err("PubNonce must be 66 bytes")?,
727                )
728                .wrap_err("Failed to parse pub nonce")?)
729            }
730            _ => Err(eyre::eyre!("Expected PubNonce in response").into()),
731        }
732    }
733
734    /// For a specific deposit, collects needed signatures from all operators into a [`Vec<Vec<Signature>>`].
735    async fn collect_operator_sigs(
736        operator_clients: ParticipatingOperators,
737        config: BridgeConfig,
738        mut deposit_sign_session: DepositSignSession,
739    ) -> Result<Vec<Vec<Signature>>, BridgeError> {
740        deposit_sign_session.nonce_gen_first_responses = Vec::new(); // not needed for operators
741        let mut operator_sigs_streams =
742            // create deposit sign streams with each operator
743            timed_try_join_all(
744                OPERATOR_SIGS_STREAM_CREATION_TIMEOUT,
745                "Operator signature stream creation",
746                Some(operator_clients.ids()),
747                operator_clients.clients().into_iter().enumerate().map(|(idx, mut operator_client)| {
748                let sign_session = deposit_sign_session.clone();
749                #[cfg(test)]
750                let config = config.clone();
751                async move {
752                    #[cfg(test)]
753                    config
754                        .test_params
755                        .timeout_params
756                        .hook_timeout_operator_sig_collection_operator(idx)
757                        .await;
758                    let stream = operator_client
759                        .deposit_sign(tonic::Request::new(sign_session))
760                        .await.wrap_err_with(|| AggregatorError::RequestFailed {
761                            request_name: format!("Deposit sign stream for operator {idx}"),
762                        })?;
763                    Ok::<_, BridgeError>(stream.into_inner())
764                }
765            }))
766                .await?;
767
768        let deposit_data: DepositData = deposit_sign_session
769            .deposit_params
770            .clone()
771            .ok_or_else(|| eyre::eyre!("No deposit params found in deposit sign session"))?
772            .try_into()
773            .wrap_err("Failed to convert deposit params to deposit data")?;
774
775        // calculate number of signatures needed from each operator
776        let needed_sigs = config.get_num_required_operator_sigs(&deposit_data);
777
778        // get signatures from each operator's signature streams
779        let operator_sigs =
780            join_all_combine_errors(operator_sigs_streams.iter_mut().enumerate().map(
781                |(idx, stream)| async move {
782                    let mut sigs: Vec<Signature> = Vec::with_capacity(needed_sigs);
783                    while let Some(sig) =
784                        stream
785                            .message()
786                            .await
787                            .wrap_err_with(|| AggregatorError::RequestFailed {
788                                request_name: format!("Deposit sign stream for operator {idx}"),
789                            })?
790                    {
791                        sigs.push(Signature::from_slice(&sig.schnorr_sig).wrap_err_with(|| {
792                            format!("Failed to parse Schnorr signature from operator {idx}")
793                        })?);
794                        if sigs.len() == needed_sigs {
795                            break;
796                        }
797                    }
798                    Ok::<_, BridgeError>(sigs)
799                },
800            ))
801            .await
802            .wrap_err("Failed to get operator signatures from operators")?;
803
804        // check if all signatures are received
805        for (idx, sigs) in operator_sigs.iter().enumerate() {
806            if sigs.len() != needed_sigs {
807                return Err(eyre::eyre!(
808                    "Not all operator sigs received from operator {}.\n Expected: {}, got: {}",
809                    idx,
810                    needed_sigs,
811                    sigs.len()
812                )
813                .into());
814            }
815        }
816        Ok(operator_sigs)
817    }
818
819    async fn create_movetx(
820        &self,
821        partial_sigs: Vec<Vec<u8>>,
822        movetx_agg_and_pub_nonces: (AggregatedNonce, Vec<PublicNonce>),
823        deposit_params: DepositParams,
824    ) -> Result<TxHandler<Signed>, Status> {
825        let mut deposit_data: DepositData = deposit_params.try_into()?;
826        let musig_partial_sigs = parser::verifier::parse_partial_sigs(partial_sigs)?;
827
828        // create move tx and calculate sighash
829        let mut move_txhandler =
830            create_move_to_vault_txhandler(&mut deposit_data, self.config.protocol_paramset())?;
831
832        let sighash = move_txhandler.calculate_script_spend_sighash_indexed(
833            0,
834            0,
835            bitcoin::TapSighashType::Default,
836        )?;
837
838        let musig_sigs_and_nonces = musig_partial_sigs
839            .into_iter()
840            .zip(movetx_agg_and_pub_nonces.1)
841            .collect::<Vec<_>>();
842
843        // aggregate partial signatures
844        let verifiers_public_keys = deposit_data.get_verifiers();
845        let final_sig = crate::musig2::aggregate_partial_signatures(
846            verifiers_public_keys,
847            None,
848            movetx_agg_and_pub_nonces.0,
849            &musig_sigs_and_nonces,
850            Message::from_digest(sighash.to_byte_array()),
851        )?;
852
853        // Put the signature in the tx
854        move_txhandler.set_p2tr_script_spend_witness(&[final_sig.as_ref()], 0, 0)?;
855
856        Ok(move_txhandler.promote()?)
857    }
858
859    async fn verify_and_save_emergency_stop_sigs(
860        &self,
861        emergency_stop_sigs: Vec<Vec<u8>>,
862        emergency_stop_agg_and_pub_nonces: (AggregatedNonce, Vec<PublicNonce>),
863        deposit_params: DepositParams,
864    ) -> Result<(), BridgeError> {
865        let mut deposit_data: DepositData = deposit_params
866            .try_into()
867            .wrap_err("Failed to convert deposit params to deposit data")?;
868        let musig_partial_sigs = parser::verifier::parse_partial_sigs(emergency_stop_sigs)
869            .wrap_err("Failed to parse emergency stop signatures")?;
870
871        // create move tx and calculate sighash
872        let move_txhandler =
873            create_move_to_vault_txhandler(&mut deposit_data, self.config.protocol_paramset())?;
874
875        let mut emergency_stop_txhandler = create_emergency_stop_txhandler(
876            &mut deposit_data,
877            &move_txhandler,
878            self.config.protocol_paramset(),
879        )?;
880
881        let sighash = emergency_stop_txhandler.calculate_script_spend_sighash_indexed(
882            0,
883            0,
884            bitcoin::TapSighashType::SinglePlusAnyoneCanPay,
885        )?;
886
887        let verifiers_public_keys = deposit_data.get_verifiers();
888
889        let musig_sigs_and_nonces = musig_partial_sigs
890            .into_iter()
891            .zip(emergency_stop_agg_and_pub_nonces.1)
892            .collect::<Vec<_>>();
893
894        let final_sig = crate::musig2::aggregate_partial_signatures(
895            verifiers_public_keys,
896            None,
897            emergency_stop_agg_and_pub_nonces.0,
898            &musig_sigs_and_nonces,
899            Message::from_digest(sighash.to_byte_array()),
900        )
901        .wrap_err("Failed to aggregate emergency stop signatures")?;
902
903        let final_sig = bitcoin::taproot::Signature {
904            signature: final_sig,
905            sighash_type: bitcoin::TapSighashType::SinglePlusAnyoneCanPay,
906        };
907
908        // insert the signature into the tx
909        emergency_stop_txhandler.set_p2tr_script_spend_witness(&[final_sig.serialize()], 0, 0)?;
910
911        let emergency_stop_tx = emergency_stop_txhandler.get_cached_tx();
912        let move_to_vault_txid = move_txhandler.get_txid();
913
914        tracing::debug!("Move to vault tx id: {}", move_to_vault_txid.to_string());
915
916        let emergency_stop_pubkey = self
917            .config
918            .emergency_stop_encryption_public_key
919            .ok_or_else(|| eyre::eyre!("Emergency stop encryption public key is not set"))?;
920        let encrypted_emergency_stop_tx = crate::encryption::encrypt_bytes(
921            emergency_stop_pubkey,
922            &bitcoin::consensus::serialize(&emergency_stop_tx),
923        )?;
924
925        self.db
926            .insert_signed_emergency_stop_tx_if_not_exists(
927                None,
928                move_to_vault_txid,
929                &encrypted_emergency_stop_tx,
930            )
931            .await?;
932
933        Ok(())
934    }
935
936    #[cfg(feature = "automation")]
937    pub async fn send_emergency_stop_tx(
938        &self,
939        tx: bitcoin::Transaction,
940    ) -> Result<bitcoin::Transaction, Status> {
941        // Add fee bumper.
942        let mut dbtx = self.db.begin_transaction().await?;
943        self.tx_sender
944            .insert_try_to_send(
945                &mut dbtx,
946                Some(TxMetadata {
947                    deposit_outpoint: None,
948                    operator_xonly_pk: None,
949                    round_idx: None,
950                    kickoff_idx: None,
951                    tx_type: TransactionType::EmergencyStop,
952                }),
953                &tx,
954                FeePayingType::RBF,
955                None,
956                &[],
957                &[],
958                &[],
959                &[],
960            )
961            .await
962            .map_err(BridgeError::from)?;
963        dbtx.commit()
964            .await
965            .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
966
967        Ok(tx)
968    }
969}
970
971#[async_trait]
972impl ClementineAggregator for AggregatorServer {
973    async fn get_compatibility_params(
974        &self,
975        _request: Request<Empty>,
976    ) -> Result<Response<CompatibilityParamsRpc>, Status> {
977        let params = self.aggregator.get_compatibility_params()?;
978        Ok(Response::new(params.try_into().map_to_status()?))
979    }
980
981    async fn get_compatibility_data_from_entities(
982        &self,
983        _request: Request<Empty>,
984    ) -> Result<Response<EntitiesCompatibilityData>, Status> {
985        let data = self
986            .aggregator
987            .get_compatibility_data_from_entities()
988            .await?;
989        Ok(Response::new(EntitiesCompatibilityData {
990            entities_compatibility_data: data,
991        }))
992    }
993
994    async fn vergen(&self, _request: Request<Empty>) -> Result<Response<VergenResponse>, Status> {
995        tracing::info!("Vergen rpc called");
996        Ok(Response::new(get_vergen_response()))
997    }
998
999    async fn get_entity_statuses(
1000        &self,
1001        request: Request<GetEntityStatusesRequest>,
1002    ) -> Result<Response<EntityStatuses>, Status> {
1003        tracing::info!("Get entity statuses rpc called");
1004        let request = request.into_inner();
1005        let restart_tasks = request.restart_tasks;
1006
1007        Ok(Response::new(EntityStatuses {
1008            entity_statuses: self.aggregator.get_entity_statuses(restart_tasks).await?,
1009        }))
1010    }
1011
1012    async fn optimistic_payout(
1013        &self,
1014        request: tonic::Request<super::OptimisticWithdrawParams>,
1015    ) -> std::result::Result<tonic::Response<super::RawSignedTx>, tonic::Status> {
1016        tracing::info!("Optimistic payout rpc called");
1017        let opt_withdraw_params = request.into_inner();
1018
1019        let withdraw_params =
1020            opt_withdraw_params
1021                .withdrawal
1022                .clone()
1023                .ok_or(Status::invalid_argument(
1024                    "Withdrawal params not found for optimistic payout",
1025                ))?;
1026        let (deposit_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
1027            parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
1028        tracing::info!("Parsed optimistic payout rpc params, deposit id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}, verification signature: {:?}", deposit_id, input_signature, input_outpoint, output_script_pubkey, output_amount, opt_withdraw_params.verification_signature);
1029
1030        // check compatibility with verifiers only
1031        self.check_compatibility_with_actors(true, false).await?;
1032
1033        // if the withdrawal utxo is spent, no reason to sign optimistic payout
1034        if self
1035            .rpc
1036            .is_utxo_spent(&input_outpoint)
1037            .await
1038            .map_to_status()?
1039        {
1040            return Err(Status::invalid_argument(format!(
1041                "Withdrawal utxo is already spent: {input_outpoint:?}",
1042            )));
1043        }
1044
1045        // check for some standard script pubkeys
1046        if !(output_script_pubkey.is_p2tr()
1047            || output_script_pubkey.is_p2pkh()
1048            || output_script_pubkey.is_p2sh()
1049            || output_script_pubkey.is_p2wpkh()
1050            || output_script_pubkey.is_p2wsh())
1051        {
1052            return Err(Status::invalid_argument(format!(
1053                "Output script pubkey is not a valid script pubkey: {output_script_pubkey}, must be p2tr, p2pkh, p2sh, p2wpkh, or p2wsh"
1054            )));
1055        }
1056
1057        // get which deposit the withdrawal belongs to
1058        let withdrawal = self
1059            .db
1060            .get_move_to_vault_txid_from_citrea_deposit(None, deposit_id)
1061            .await?;
1062        if let Some(move_txid) = withdrawal {
1063            // check if withdrawal utxo is correct
1064            let withdrawal_utxo = self
1065                .db
1066                .get_withdrawal_utxo_from_citrea_withdrawal(None, deposit_id)
1067                .await?;
1068            if withdrawal_utxo != input_outpoint {
1069                return Err(Status::invalid_argument(format!(
1070                    "Withdrawal utxo is not correct: {withdrawal_utxo:?} != {input_outpoint:?}",
1071                )));
1072            }
1073
1074            // Prepare input and output of the payout transaction.
1075            let withdrawal_prevout = self
1076                .rpc
1077                .get_txout_from_outpoint(&input_outpoint)
1078                .await
1079                .map_to_status()?;
1080
1081            let user_xonly_pk = withdrawal_prevout
1082                .script_pubkey
1083                .try_get_taproot_pk()
1084                .map_err(|_| {
1085                    Status::invalid_argument(format!(
1086                        "Withdrawal prevout script_pubkey is not a Taproot output: {:?}",
1087                        withdrawal_prevout.script_pubkey
1088                    ))
1089                })?;
1090
1091            let withdrawal_utxo = UTXO {
1092                outpoint: input_outpoint,
1093                txout: withdrawal_prevout,
1094            };
1095
1096            let output_txout = TxOut {
1097                value: output_amount,
1098                script_pubkey: output_script_pubkey,
1099            };
1100
1101            let deposit_data = self
1102                .db
1103                .get_deposit_data_with_move_tx(None, move_txid)
1104                .await?;
1105
1106            let mut deposit_data = deposit_data
1107                .ok_or(eyre::eyre!(
1108                    "Deposit data not found for move txid {}",
1109                    move_txid
1110                ))
1111                .map_err(BridgeError::from)?;
1112
1113            let mut opt_payout_txhandler = create_optimistic_payout_txhandler(
1114                &mut deposit_data,
1115                withdrawal_utxo,
1116                output_txout,
1117                input_signature,
1118                self.config.protocol_paramset(),
1119            )?;
1120
1121            let sighash = opt_payout_txhandler
1122                .calculate_pubkey_spend_sighash(0, input_signature.sighash_type)?;
1123
1124            let message = Message::from_digest(sighash.to_byte_array());
1125
1126            SECP.verify_schnorr(&input_signature.signature, &message, &user_xonly_pk)
1127                .map_err(|_| Status::internal("Invalid signature for optimistic payout tx. Ensure the signature uses SinglePlusAnyoneCanPay sighash type."))?;
1128
1129            // get which verifiers participated in the deposit to collect the optimistic payout tx signature
1130            let participating_verifiers = self.get_participating_verifiers(&deposit_data).await?;
1131            let verifiers_ids = participating_verifiers.ids();
1132            let (first_responses, mut nonce_streams) = {
1133                create_nonce_streams(
1134                    participating_verifiers.clone(),
1135                    1,
1136                    #[cfg(test)]
1137                    &self.config,
1138                )
1139                .await?
1140            };
1141            // collect nonces
1142            let pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
1143                .await
1144                .wrap_err("Failed to aggregate nonces for optimistic payout")
1145                .map_to_status()?;
1146            let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::<Vec<_>>().as_slice())?;
1147
1148            let agg_nonce_bytes = agg_nonce.serialize().to_vec();
1149            // send the agg nonce to the verifiers to sign the optimistic payout tx
1150            let opt_payout_sign_futures = participating_verifiers
1151                .clients()
1152                .iter()
1153                .zip(first_responses)
1154                .map(|(client, first_response)| {
1155                    let mut client = client.clone();
1156                    let opt_withdraw_params = opt_withdraw_params.clone();
1157                    {
1158                        let agg_nonce_serialized = agg_nonce_bytes.clone();
1159                        async move {
1160                            let mut request = Request::new(OptimisticPayoutParams {
1161                                opt_withdrawal: Some(opt_withdraw_params),
1162                                agg_nonce: agg_nonce_serialized,
1163                                nonce_gen: Some(first_response),
1164                            });
1165                            request.set_timeout(OPTIMISTIC_PAYOUT_TIMEOUT);
1166                            client.optimistic_payout_sign(request).await
1167                        }
1168                    }
1169                })
1170                .collect::<Vec<_>>();
1171
1172            // get signatures and check for any errors
1173            let opt_payout_resps = join_all(opt_payout_sign_futures).await;
1174            let mut payout_sigs = Vec::new();
1175            let mut errors = Vec::new();
1176            for (resp, verifier_id) in opt_payout_resps
1177                .into_iter()
1178                .zip(participating_verifiers.ids())
1179            {
1180                match resp {
1181                    Ok(res) => {
1182                        payout_sigs.push(res.into_inner());
1183                    }
1184                    Err(e) => {
1185                        errors.push(format!("{verifier_id} optimistic payout sign failed: {e}"));
1186                    }
1187                }
1188            }
1189            if !errors.is_empty() {
1190                return Err(eyre::eyre!("{errors:?}").into_status());
1191            }
1192
1193            // calculate final sig
1194            // txin at index 1 is deposited utxo in movetx
1195            let sighash = opt_payout_txhandler.calculate_script_spend_sighash_indexed(
1196                1,
1197                0,
1198                bitcoin::TapSighashType::Default,
1199            )?;
1200
1201            let musig_partial_sigs = payout_sigs
1202                .into_iter()
1203                .map(|sig| {
1204                    PartialSignature::from_byte_array(
1205                        &sig.partial_sig
1206                            .try_into()
1207                            .map_err(|_| secp256k1::musig::ParseError::MalformedArg)?,
1208                    )
1209                })
1210                .collect::<Result<Vec<_>, _>>()
1211                .map_err(|e| Status::internal(format!("Failed to parse partial sig: {e:?}")))?;
1212
1213            let musig_sigs_and_nonces = musig_partial_sigs
1214                .into_iter()
1215                .zip(pub_nonces)
1216                .collect::<Vec<_>>();
1217
1218            let final_sig = bitcoin::taproot::Signature {
1219                signature: crate::musig2::aggregate_partial_signatures(
1220                    deposit_data.get_verifiers(),
1221                    None,
1222                    agg_nonce,
1223                    &musig_sigs_and_nonces,
1224                    Message::from_digest(sighash.to_byte_array()),
1225                )?,
1226                sighash_type: bitcoin::TapSighashType::Default,
1227            };
1228
1229            // set witness and send tx
1230            opt_payout_txhandler.set_p2tr_script_spend_witness(&[final_sig.serialize()], 1, 0)?;
1231            let opt_payout_txhandler = opt_payout_txhandler.promote()?;
1232            let opt_payout_tx = opt_payout_txhandler.get_cached_tx();
1233            tracing::info!(
1234                "Optimistic payout transaction created successfully for deposit id: {:?}",
1235                deposit_id
1236            );
1237
1238            #[cfg(feature = "automation")]
1239            {
1240                tracing::info!("Sending optimistic payout tx via tx_sender");
1241
1242                let mut dbtx = self.db.begin_transaction().await?;
1243                self.tx_sender
1244                    .add_tx_to_queue(
1245                        &mut dbtx,
1246                        TransactionType::OptimisticPayout,
1247                        opt_payout_tx,
1248                        &[],
1249                        None,
1250                        &self.config,
1251                        None,
1252                    )
1253                    .await
1254                    .map_to_status()?;
1255                dbtx.commit().await.map_err(|e| {
1256                    Status::internal(format!(
1257                        "Failed to commit db transaction to send optimistic payout tx: {e}",
1258                    ))
1259                })?;
1260            }
1261
1262            Ok(Response::new(RawSignedTx::from(opt_payout_tx)))
1263        } else {
1264            Err(Status::not_found(format!(
1265                "Withdrawal with index {deposit_id} not found."
1266            )))
1267        }
1268    }
1269
1270    async fn internal_send_tx(
1271        &self,
1272        request: Request<clementine::SendTxRequest>,
1273    ) -> Result<Response<Empty>, Status> {
1274        #[cfg(not(feature = "automation"))]
1275        {
1276            Err(Status::unimplemented("Automation is not enabled"))
1277        }
1278        #[cfg(feature = "automation")]
1279        {
1280            let send_tx_req = request.into_inner();
1281            let fee_type = send_tx_req.fee_type();
1282            let signed_tx: bitcoin::Transaction = send_tx_req
1283                .raw_tx
1284                .ok_or(Status::invalid_argument("Missing raw_tx"))?
1285                .try_into()?;
1286            tracing::warn!(
1287                "Internal send tx rpc called with feetype: {:?}, tx hex: {}",
1288                fee_type,
1289                bitcoin::consensus::encode::serialize_hex(&signed_tx)
1290            );
1291
1292            let mut dbtx = self.db.begin_transaction().await?;
1293            self.tx_sender
1294                .insert_try_to_send(
1295                    &mut dbtx,
1296                    None,
1297                    &signed_tx,
1298                    fee_type.try_into()?,
1299                    None,
1300                    &[],
1301                    &[],
1302                    &[],
1303                    &[],
1304                )
1305                .await
1306                .map_to_status()?;
1307            dbtx.commit()
1308                .await
1309                .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
1310            Ok(Response::new(Empty {}))
1311        }
1312    }
1313
1314    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
1315    async fn setup(
1316        &self,
1317        _request: Request<Empty>,
1318    ) -> Result<Response<VerifierPublicKeys>, Status> {
1319        tracing::info!("Setup rpc called");
1320        self.check_compatibility_with_actors(true, true).await?;
1321        // Propagate Operators configurations to all verifier clients
1322        const CHANNEL_CAPACITY: usize = 1024 * 16;
1323        let (operator_params_tx, operator_params_rx) =
1324            tokio::sync::broadcast::channel(CHANNEL_CAPACITY);
1325        let operator_params_rx_handles = (0..self.get_verifier_clients().len())
1326            .map(|_| operator_params_rx.resubscribe())
1327            .collect::<Vec<_>>();
1328
1329        let operators = self.get_operator_clients().to_vec();
1330        let operator_pks = self.fetch_operator_keys().await?;
1331        let operator_ids = operator_pks
1332            .iter()
1333            .map(|key| OperatorId(*key))
1334            .collect::<Vec<_>>();
1335        let get_operator_params_chunked_handle = tokio::spawn(async move {
1336            tracing::info!(clients = operators.len(), "Collecting operator details...");
1337            join_all_combine_errors(operators.iter().zip(operator_ids.iter()).map(
1338                |(operator, id)| {
1339                    let mut operator = operator.clone();
1340                    let tx = operator_params_tx.clone();
1341                    async move {
1342                        let stream = operator
1343                            .get_params(Request::new(Empty {}))
1344                            .await
1345                            .wrap_err_with(|| AggregatorError::RequestFailed {
1346                                request_name: format!("Operator get params for {id}"),
1347                            })
1348                            .map_err(BridgeError::from)?
1349                            .into_inner();
1350                        tx.send(stream.try_collect::<Vec<_>>().await?)
1351                            .map_err(|e| {
1352                                BridgeError::from(eyre::eyre!(
1353                                    "Failed to read operator params for {id}: {e}"
1354                                ))
1355                            })?;
1356                        Ok::<_, Status>(())
1357                    }
1358                },
1359            ))
1360            .await
1361            .wrap_err("Failed to get operator params from operators")
1362            .map_to_status()?;
1363            Ok::<_, Status>(())
1364        });
1365
1366        let verifiers = self.get_verifier_clients().to_vec();
1367        let verifier_pks = self.fetch_verifier_keys().await?;
1368        let verifier_ids = verifier_pks
1369            .iter()
1370            .map(|key| VerifierId(*key))
1371            .collect::<Vec<_>>();
1372        let set_operator_params_handle = tokio::spawn(async move {
1373            tracing::info!("Informing verifiers of existing operators...");
1374            join_all_combine_errors(
1375                verifiers
1376                    .iter()
1377                    .zip(verifier_ids.iter())
1378                    .zip(operator_params_rx_handles)
1379                    .map(|((verifier, id), mut rx)| {
1380                        let verifier = verifier.clone();
1381                        async move {
1382                            collect_and_call(&mut rx, |params| {
1383                                let mut verifier = verifier.clone();
1384                                async move {
1385                                    verifier
1386                                        .set_operator(futures::stream::iter(params))
1387                                        .await
1388                                        .wrap_err_with(|| AggregatorError::RequestFailed {
1389                                            request_name: format!("Verifier set_operator for {id}"),
1390                                        })
1391                                        .map_err(BridgeError::from)?;
1392                                    Ok::<_, Status>(())
1393                                }
1394                            })
1395                            .await?;
1396                            Ok::<_, Status>(())
1397                        }
1398                    }),
1399            )
1400            .await
1401            .wrap_err("Failed to set_operator for all verifiers")
1402            .map_to_status()?;
1403            Ok::<_, Status>(())
1404        });
1405
1406        let task_outputs = timed_request(
1407            SETUP_COMPLETION_TIMEOUT,
1408            "Aggregator setup pipeline",
1409            async move {
1410                Ok::<_, BridgeError>(
1411                    futures::future::join_all([
1412                        get_operator_params_chunked_handle,
1413                        set_operator_params_handle,
1414                    ])
1415                    .await,
1416                )
1417            },
1418        )
1419        .await?;
1420
1421        let task_names = ["Get operator params", "Set operator params"];
1422        debug_assert_eq!(task_names.len(), task_outputs.len());
1423
1424        flatten_join_named_results(task_names.into_iter().zip(task_outputs.into_iter()))?;
1425
1426        Ok(Response::new(VerifierPublicKeys::from(verifier_pks)))
1427    }
1428
1429    /// Handles a new deposit request from a user. This function coordinates the signing process
1430    /// between verifiers to create a valid move transaction. It ensures a covenant using pre-signed NofN transactions.
1431    /// It also collects signatures from operators to ensure that the operators can be slashed if they act maliciously.
1432    ///
1433    /// Overview:
1434    /// 1. Receive and parse deposit parameters from user
1435    /// 2. Signs all NofN transactions with verifiers using MuSig2:
1436    ///    - Creates nonce streams with verifiers (get pub nonces for each transaction)
1437    ///    - Opens deposit signing streams with verifiers (sends aggnonces for each transaction, receives partial sigs)
1438    ///    - Opens deposit finalization streams with verifiers (sends final signatures, receives movetx signatures)
1439    /// 3. Collects signatures from operators
1440    /// 4. Waits for all tasks to complete
1441    /// 5. Returns signed move transaction
1442    ///
1443    /// The following pipelines are used to coordinate the signing process, these move the data between the verifiers and the aggregator:
1444    ///    - Nonce aggregation
1445    ///    - Nonce distribution
1446    ///    - Signature aggregation
1447    ///    - Signature distribution
1448    // #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
1449    async fn new_deposit(
1450        &self,
1451        request: Request<Deposit>,
1452    ) -> Result<Response<clementine::RawSignedTx>, Status> {
1453        tracing::info!("New deposit rpc called");
1454        self.check_compatibility_with_actors(true, true).await?;
1455
1456        timed_request(OVERALL_DEPOSIT_TIMEOUT, "Overall new deposit", async {
1457            let deposit_info: DepositInfo = request.into_inner().try_into()?;
1458            tracing::info!(
1459                "Parsed new deposit rpc params, deposit info: {:?}",
1460                deposit_info
1461            );
1462
1463            let deposit_data = DepositData {
1464                deposit: deposit_info.clone(),
1465                nofn_xonly_pk: None,
1466                actors: Actors {
1467                    verifiers: self.fetch_verifier_keys().await?,
1468                    watchtowers: vec![],
1469                    operators: self.fetch_operator_keys().await?,
1470                },
1471                security_council: self.config.security_council.clone(),
1472            };
1473            tracing::info!(
1474                "Created deposit data in new_deposit for deposit info: {:?}, deposit data: {:?}",
1475                deposit_info,
1476                deposit_data
1477            );
1478
1479            let deposit_params = deposit_data.clone().into();
1480
1481            // Collect and distribute keys needed keys from operators and watchtowers to verifiers
1482            let start = std::time::Instant::now();
1483            timed_request(
1484                KEY_DISTRIBUTION_TIMEOUT,
1485                "Key collection and distribution",
1486                self.collect_and_distribute_keys(&deposit_params),
1487            )
1488            .await?;
1489            tracing::info!("Collected and distributed keys in {:?}", start.elapsed());
1490
1491            let verifiers = self.get_participating_verifiers(&deposit_data).await?;
1492            let verifiers_ids = verifiers.ids();
1493
1494            // Generate nonce streams for all verifiers.
1495            let num_required_sigs = self.config.get_num_required_nofn_sigs(&deposit_data);
1496            let num_required_nonces = num_required_sigs as u32 + 2; // ask for +2 for the final movetx signature + emergency stop signature, but don't send it on deposit_sign stage
1497            let (first_responses, nonce_streams) =
1498                    create_nonce_streams(
1499                        verifiers.clone(),
1500                        num_required_nonces,
1501                        #[cfg(test)]
1502                        &self.config,
1503                    )
1504                    .await?;
1505
1506            // Create initial deposit session and send to verifiers
1507            let deposit_sign_session = DepositSignSession {
1508                deposit_params: Some(deposit_params.clone()),
1509                nonce_gen_first_responses: first_responses,
1510            };
1511
1512            let deposit_sign_param: VerifierDepositSignParams =
1513                    deposit_sign_session.clone().into();
1514
1515        #[allow(clippy::unused_enumerate_index)]
1516            let partial_sig_streams = timed_try_join_all(
1517                PARTIAL_SIG_STREAM_CREATION_TIMEOUT,
1518                "Partial signature stream creation",
1519                Some(verifiers.ids()),
1520                verifiers.clients().into_iter().enumerate().map(|(_idx, verifier_client)| {
1521                    let mut verifier_client = verifier_client.clone();
1522                    #[cfg(test)]
1523                    let config = self.config.clone();
1524
1525                    let deposit_sign_param =
1526                    deposit_sign_param.clone();
1527
1528                    async move {
1529                        #[cfg(test)]
1530                        config
1531                            .test_params
1532                            .timeout_params
1533                            .hook_timeout_partial_sig_stream_creation_verifier(_idx)
1534                            .await;
1535
1536                        let (tx, rx) = tokio::sync::mpsc::channel(num_required_nonces as usize + 1); // initial param + num_required_nonces nonces
1537
1538                        let stream = verifier_client
1539                            .deposit_sign(tokio_stream::wrappers::ReceiverStream::new(rx))
1540                            .await?
1541                            .into_inner();
1542
1543                        tx.send(deposit_sign_param).await.map_err(|e| {
1544                            BridgeError::from(eyre::eyre!("Failed to send deposit sign session: {e:?}"))})?;
1545
1546                        Ok::<_, BridgeError>((stream, tx))
1547                    }
1548                })
1549            )
1550            .await?;
1551
1552            // Set up deposit finalization streams
1553        #[allow(clippy::unused_enumerate_index)]
1554            let deposit_finalize_streams = verifiers.clients().into_iter().enumerate().map(
1555                    |(_idx, mut verifier)| {
1556                        let (tx, rx) = tokio::sync::mpsc::channel(num_required_nonces as usize + 1);
1557                        let receiver_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1558                        #[cfg(test)]
1559                        let config = self.config.clone();
1560                        // start deposit_finalize with tokio spawn
1561                        let deposit_finalize_future = tokio::spawn(async move {
1562                            #[cfg(test)]
1563                            config
1564                                .test_params
1565                                .timeout_params
1566                                .hook_timeout_deposit_finalize_verifier(_idx)
1567                                .await;
1568
1569                            verifier.deposit_finalize(receiver_stream).await
1570                        });
1571
1572                        Ok::<_, BridgeError>((deposit_finalize_future, tx))
1573                    },
1574                ).collect::<Result<Vec<_>, BridgeError>>()?;
1575
1576            tracing::info!("Sending deposit finalize streams to verifiers for deposit {:?}", deposit_info);
1577
1578            let (deposit_finalize_futures, deposit_finalize_sender): (Vec<_>, Vec<_>) =
1579                deposit_finalize_streams.into_iter().unzip();
1580
1581            // Send initial finalization params
1582            let deposit_finalize_first_param: VerifierDepositFinalizeParams =
1583                deposit_sign_session.clone().into();
1584
1585            timed_try_join_all(
1586                DEPOSIT_FINALIZE_STREAM_CREATION_TIMEOUT,
1587                "Deposit finalization initial param send",
1588                Some(verifiers.ids()),
1589                deposit_finalize_sender.iter().cloned().map(|tx| {
1590                    let param = deposit_finalize_first_param.clone();
1591                    async move {
1592                        tx.send(param).await
1593                        .map_err(|e| {
1594                            BridgeError::from(eyre::eyre!(
1595                                "Failed to send deposit finalize first param: {e:?}"))
1596                        })
1597                    }
1598                })
1599            ).await?;
1600
1601
1602            let deposit_blockhash = self
1603                .rpc
1604                .get_blockhash_of_tx(&deposit_data.get_deposit_outpoint().txid)
1605                .await
1606                .map_to_status()?;
1607
1608            let verifiers_public_keys = deposit_data.get_verifiers();
1609
1610            let needed_nofn_sigs = self.config.get_num_required_nofn_sigs(&deposit_data);
1611
1612            // Create sighash stream for transaction signing
1613            let sighash_stream = Box::pin(create_nofn_sighash_stream(
1614                self.db.clone(),
1615                self.config.clone(),
1616                deposit_data.clone(),
1617                deposit_blockhash,
1618                false,
1619            ));
1620
1621            // Create channels for pipeline communication
1622            let (agg_nonce_sender, agg_nonce_receiver) = channel(num_required_nonces as usize);
1623            let (partial_sig_sender, partial_sig_receiver) = channel(num_required_nonces as usize);
1624            let (final_sig_sender, final_sig_receiver) = channel(num_required_nonces as usize);
1625
1626            // Start the nonce aggregation pipe.
1627            let nonce_agg_handle = tokio::spawn(nonce_aggregator(
1628                nonce_streams,
1629                sighash_stream,
1630                agg_nonce_sender,
1631                needed_nofn_sigs,
1632                verifiers_ids.clone(),
1633            ));
1634
1635            // Start the nonce distribution pipe.
1636            let nonce_dist_handle = tokio::spawn(nonce_distributor(
1637                agg_nonce_receiver,
1638                partial_sig_streams,
1639                partial_sig_sender,
1640                needed_nofn_sigs,
1641                verifiers_ids.clone(),
1642            ));
1643
1644            // Start the signature aggregation pipe.
1645            let sig_agg_handle = tokio::spawn(signature_aggregator(
1646                partial_sig_receiver,
1647                verifiers_public_keys,
1648                final_sig_sender,
1649                needed_nofn_sigs,
1650            ));
1651
1652            tracing::debug!("Getting signatures from operators");
1653            // Get sigs from each operator in background
1654            let operators = self.get_participating_operators(&deposit_data).await?;
1655
1656            let config_clone = self.config.clone();
1657            let operator_sigs_fut = tokio::spawn(async move {
1658                timed_request(
1659                    OPERATOR_SIGS_TIMEOUT,
1660                    "Operator signature collection",
1661                    async {
1662                        Aggregator::collect_operator_sigs(
1663                            operators,
1664                            config_clone,
1665                            deposit_sign_session,
1666                        )
1667                        .await
1668                    },
1669                )
1670                .await
1671            });
1672
1673            // Join the nonce aggregation handle to get the movetx agg nonce.
1674            let nonce_agg_handle = nonce_agg_handle
1675                .map_err(|_| Status::internal("panic when aggregating nonces"))
1676                .map(
1677                    |res| -> Result<((AggregatedNonce, Vec<PublicNonce>), (AggregatedNonce, Vec<PublicNonce>)), Status> {
1678                        res.and_then(|r| r.map_err(Into::into))
1679                    },
1680                )
1681                .shared();
1682
1683            // Start the deposit finalization pipe.
1684            let sig_dist_handle = tokio::spawn(signature_distributor(
1685                final_sig_receiver,
1686                deposit_finalize_sender.clone(),
1687                nonce_agg_handle.clone(),
1688                needed_nofn_sigs,
1689                verifiers_ids.clone(),
1690            ));
1691
1692            // Right now we collect all operator sigs then start to send them, we can do it simultaneously in the future
1693            // Need to change sig verification ordering in deposit_finalize() in verifiers so that we verify
1694            // 1st signature of all operators, then 2nd of all operators etc.
1695            let all_op_sigs = operator_sigs_fut
1696                .await
1697                .map_err(|_| BridgeError::from(eyre::eyre!("panic when collecting operator signatures")))??;
1698
1699            tracing::info!("Got all operator signatures for deposit {:?}", deposit_info);
1700
1701            // Wait for all pipeline tasks to complete
1702            // join_all should be enough here as if one fails other tasks should fail too as they are connected through streams
1703            // one should not hang if any other task fails, the others should finish
1704            // this is needed because try_join_all can potentially not return the error of the first task that failed, just the one it polled first
1705            // that returned an error
1706            let task_outputs =  timed_request(
1707                PIPELINE_COMPLETION_TIMEOUT,
1708                "MuSig2 signing pipeline",
1709                async move {
1710                    Ok::<_, BridgeError>(futures::future::join_all([nonce_dist_handle, sig_agg_handle, sig_dist_handle]).await)
1711                },
1712            )
1713            .await?;
1714
1715            let task_names = ["Nonce distribution", "Signature aggregation", "Signature distribution"];
1716
1717            debug_assert_eq!(task_names.len(), task_outputs.len());
1718
1719            flatten_join_named_results(
1720                task_names.into_iter().zip(task_outputs.into_iter()),
1721            )?;
1722            tracing::info!("All deposit_sign related tasks completed for deposit {:?}, now sending operator signatures to verifiers for verification", deposit_info);
1723
1724            tracing::debug!("Pipeline tasks completed");
1725            let verifiers_ids = verifiers.ids();
1726
1727            // send operators sigs to verifiers after all verifiers have signed
1728            let deposit_finalize_futures = timed_request(
1729                SEND_OPERATOR_SIGS_TIMEOUT,
1730                "Sending operator signatures to verifiers",
1731                async {
1732                    let send_operator_sigs: Vec<_> = deposit_finalize_sender
1733                        .iter()
1734                        .zip(verifiers_ids.iter())
1735                        .zip(deposit_finalize_futures.into_iter())
1736                        .map(|((tx, id), dep_fin_fut)| async {
1737                            for one_op_sigs in all_op_sigs.iter() {
1738                                for sig in one_op_sigs.iter() {
1739                                    let deposit_finalize_param: VerifierDepositFinalizeParams =
1740                                        sig.into();
1741
1742                                    let send = tx.send(deposit_finalize_param).await;
1743                                    match send {
1744                                        Ok(()) => (),
1745                                        Err(e) => {
1746                                            // check exact error by awaiting the future
1747                                            dep_fin_fut.await.wrap_err(format!("{} deposit finalize tokio task on aggregator returned error", id.clone()))?.wrap_err(format!("{} deposit finalize rpc call returned error", id.clone()))?;
1748                                            return Err(BridgeError::from(eyre::eyre!(format!("{} deposit finalize stream sending returned error: {:?}", id.clone(), e))));
1749                                        }
1750                                    }
1751                                }
1752                            }
1753
1754                            Ok::<_, BridgeError>(dep_fin_fut)
1755                        })
1756                        .collect();
1757                    join_all_combine_errors(send_operator_sigs).await
1758                },
1759            )
1760            .await.wrap_err("Failed to send operator signatures to verifiers")?;
1761
1762            tracing::info!("All operator signatures sent to verifiers for verification, now waiting to collect movetx and emergency stop tx partial signatures from verifiers for deposit {:?}", deposit_info);
1763
1764            // Collect partial signatures for move transaction
1765            let partial_sigs: Vec<(Vec<u8>, Vec<u8>)> = timed_try_join_all(
1766                    DEPOSIT_FINALIZATION_TIMEOUT,
1767                    "Deposit finalization",
1768                    Some(verifiers.ids()),
1769                    deposit_finalize_futures.into_iter().map(|fut| async move {
1770                        let inner = fut.await
1771                            .map_err(|_| BridgeError::from(eyre::eyre!("panic finishing deposit_finalize")))??
1772                            .into_inner();
1773                        Ok((inner.move_to_vault_partial_sig, inner.emergency_stop_partial_sig))
1774                    }),
1775                )
1776                .await?;
1777
1778
1779            let (move_to_vault_sigs, emergency_stop_sigs): (Vec<Vec<u8>>, Vec<Vec<u8>>) =
1780                partial_sigs.into_iter().unzip();
1781
1782            tracing::info!("Received move tx and emergency stop tx partial signatures for deposit {:?}", deposit_info);
1783
1784            // Create the final move transaction and check the signatures
1785            let (movetx_agg_nonce, emergency_stop_agg_nonce) = nonce_agg_handle.await?;
1786
1787            // Verify emergency stop signatures
1788            self.verify_and_save_emergency_stop_sigs(
1789                emergency_stop_sigs,
1790                emergency_stop_agg_nonce,
1791                deposit_params.clone(),
1792            )
1793            .await?;
1794
1795            let signed_movetx_handler = self
1796                .create_movetx(move_to_vault_sigs, movetx_agg_nonce, deposit_params)
1797                .await?;
1798
1799            let raw_signed_tx = RawSignedTx {
1800                raw_tx: bitcoin::consensus::serialize(&signed_movetx_handler.get_cached_tx()),
1801            };
1802
1803            tracing::info!("Created final move transaction for deposit {:?}", deposit_info);
1804
1805            Ok(Response::new(raw_signed_tx))
1806        })
1807        .await.map_err(Into::into)
1808    }
1809
1810    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
1811    async fn withdraw(
1812        &self,
1813        request: Request<AggregatorWithdrawalInput>,
1814    ) -> Result<Response<AggregatorWithdrawResponse>, Status> {
1815        tracing::warn!("Withdraw rpc called");
1816        let request = request.into_inner();
1817        let (withdraw_params_with_sig, operator_xonly_pks) = (
1818            request.withdrawal.ok_or(Status::invalid_argument(
1819                "withdrawalParamsWithSig is missing",
1820            ))?,
1821            request.operator_xonly_pks,
1822        );
1823        // check compatibility with operators only
1824        self.check_compatibility_with_actors(false, true).await?;
1825
1826        let withdraw_params = withdraw_params_with_sig
1827            .clone()
1828            .withdrawal
1829            .ok_or(Status::invalid_argument("withdrawalParams is missing"))?;
1830
1831        // convert rpc xonly pks to bitcoin xonly pks
1832        let operator_xonly_pks_from_rpc: Vec<XOnlyPublicKey> = operator_xonly_pks
1833            .into_iter()
1834            .map(|xonly_pk| {
1835                xonly_pk.try_into().map_err(|e| {
1836                    Status::invalid_argument(format!("Failed to convert xonly public key: {e}"))
1837                })
1838            })
1839            .collect::<Result<Vec<_>, Status>>()?;
1840
1841        tracing::warn!(
1842            "Parsed withdraw rpc params, withdrawal params: {:?}, operator xonly pks: {:?}",
1843            withdraw_params,
1844            operator_xonly_pks_from_rpc
1845                .iter()
1846                .map(|pk| pk.to_string())
1847                .collect::<Vec<_>>()
1848        );
1849
1850        // parse_withdrawal_sig_params is called to check if the inputs can be parsed correctly
1851        // and check if input sighash type is SinglePlusAnyoneCanPay
1852        let (withdrawal_id, _, _, _, _) =
1853            parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
1854
1855        // check if all given operator xonly pubkeys are a valid operator xonly pubkey, to warn the caller if
1856        // something is wrong with the given operator xonly pubkeys
1857        let current_operator_xonly_pks = self.fetch_operator_keys().await?;
1858        let invalid_operator_xonly_pks = operator_xonly_pks_from_rpc
1859            .iter()
1860            .filter(|xonly_pk| !current_operator_xonly_pks.contains(xonly_pk))
1861            .collect::<Vec<_>>();
1862        if !invalid_operator_xonly_pks.is_empty() {
1863            return Err(Status::invalid_argument(format!(
1864                "Given xonly public key doesn't belong to any current operator: invalid keys: {invalid_operator_xonly_pks:?}, current operators: {current_operator_xonly_pks:?}"
1865            )));
1866        }
1867
1868        let operators = self
1869            .get_operator_clients()
1870            .iter()
1871            .zip(current_operator_xonly_pks.into_iter());
1872        let withdraw_futures = operators
1873            .filter(|(_, xonly_pk)| {
1874                // check if operator_xonly_pks is empty or contains the operator's xonly public key
1875                operator_xonly_pks_from_rpc.is_empty()
1876                    || operator_xonly_pks_from_rpc.contains(xonly_pk)
1877            })
1878            .map(|(operator, operator_xonly_pk)| {
1879                let mut operator = operator.clone();
1880                let params = withdraw_params_with_sig.clone();
1881                let mut request = Request::new(params);
1882                request.set_timeout(WITHDRAWAL_TIMEOUT);
1883                async move { (operator.withdraw(request).await, operator_xonly_pk) }
1884            });
1885
1886        // collect responses from operators and return them as a vector of strings
1887        let responses = futures::future::join_all(withdraw_futures).await;
1888        tracing::warn!(
1889            "Withdraw rpc completed successfully for withdrawal id: {}, operator xonly pks: {:?}, responses: {:?}",
1890            withdrawal_id,
1891            operator_xonly_pks_from_rpc
1892                .iter()
1893                .map(|pk| pk.to_string())
1894                .collect::<Vec<_>>(),
1895            responses,
1896        );
1897        Ok(Response::new(AggregatorWithdrawResponse {
1898            withdraw_responses: responses
1899                .into_iter()
1900                .map(|(res, xonly_pk)| match res {
1901                    Ok(withdraw_response) => OperatorWithrawalResponse {
1902                        operator_xonly_pk: Some(xonly_pk.into()),
1903                        response: Some(operator_withrawal_response::Response::RawTx(
1904                            withdraw_response.into_inner(),
1905                        )),
1906                    },
1907                    Err(e) => OperatorWithrawalResponse {
1908                        operator_xonly_pk: Some(xonly_pk.into()),
1909                        response: Some(operator_withrawal_response::Response::Error(e.to_string())),
1910                    },
1911                })
1912                .collect(),
1913        }))
1914    }
1915
1916    async fn get_nofn_aggregated_xonly_pk(
1917        &self,
1918        _: tonic::Request<super::Empty>,
1919    ) -> std::result::Result<tonic::Response<super::NofnResponse>, tonic::Status> {
1920        tracing::info!("Get nofn aggregated xonly pk rpc called");
1921        let verifier_keys = self.fetch_verifier_keys().await?;
1922        let num_verifiers = verifier_keys.len();
1923        let nofn_xonly_pk = bitcoin::XOnlyPublicKey::from_musig2_pks(verifier_keys.clone(), None)
1924            .map_err(|e| {
1925            Status::internal(format!(
1926                "Failed to aggregate verifier public keys, err: {e}, pubkeys: {verifier_keys:?}"
1927            ))
1928        })?;
1929        Ok(Response::new(super::NofnResponse {
1930            nofn_xonly_pk: nofn_xonly_pk.serialize().to_vec(),
1931            num_verifiers: num_verifiers as u32,
1932        }))
1933    }
1934
1935    async fn internal_get_emergency_stop_tx(
1936        &self,
1937        request: Request<clementine::GetEmergencyStopTxRequest>,
1938    ) -> Result<Response<clementine::GetEmergencyStopTxResponse>, Status> {
1939        tracing::warn!("Get emergency stop tx rpc called");
1940        let inner_request = request.into_inner();
1941        let txids: Vec<Txid> = inner_request
1942            .txids
1943            .into_iter()
1944            .map(|txid| {
1945                Txid::from_slice(&txid.txid).map_err(|e| {
1946                    tonic::Status::invalid_argument(format!("Failed to parse txid: {e}"))
1947                })
1948            })
1949            .collect::<Result<Vec<_>, _>>()?;
1950        tracing::warn!(
1951            "Parsed get emergency stop tx rpc params, move txids: {:?}",
1952            txids
1953                .iter()
1954                .map(|txid| txid.to_string())
1955                .collect::<Vec<_>>()
1956        );
1957
1958        let emergency_stop_txs = self.db.get_emergency_stop_txs(None, txids).await?;
1959
1960        let (txids, encrypted_emergency_stop_txs): (Vec<Txid>, Vec<Vec<u8>>) =
1961            emergency_stop_txs.into_iter().unzip();
1962
1963        Ok(Response::new(clementine::GetEmergencyStopTxResponse {
1964            txids: txids.into_iter().map(|txid| txid.into()).collect(),
1965            encrypted_emergency_stop_txs,
1966        }))
1967    }
1968
1969    async fn send_move_to_vault_tx(
1970        &self,
1971        request: Request<clementine::SendMoveTxRequest>,
1972    ) -> Result<Response<clementine::Txid>, Status> {
1973        tracing::info!("Send move to vault tx rpc called");
1974        #[cfg(not(feature = "automation"))]
1975        {
1976            let _ = request;
1977            return Err(Status::unimplemented(
1978                "Automation is disabled, cannot automatically send move to vault tx.",
1979            ));
1980        }
1981
1982        #[cfg(feature = "automation")]
1983        {
1984            use bitcoin::Amount;
1985            use std::sync::Arc;
1986
1987            use crate::builder::{
1988                address::create_taproot_address,
1989                script::{CheckSig, Multisig, SpendableScript},
1990                transaction::anchor_output,
1991            };
1992
1993            let request = request.into_inner();
1994            let movetx: bitcoin::Transaction = bitcoin::consensus::deserialize(
1995                &request
1996                    .raw_tx
1997                    .ok_or_eyre("raw_tx is required")
1998                    .map_to_status()?
1999                    .raw_tx,
2000            )
2001            .wrap_err("Failed to deserialize movetx")
2002            .map_to_status()?;
2003            let deposit_outpoint: bitcoin::OutPoint = request
2004                .deposit_outpoint
2005                .ok_or(Status::invalid_argument("deposit_outpoint is required"))?
2006                .try_into()?;
2007
2008            tracing::info!(
2009                "Parsed send move to vault tx rpc params, deposit outpoint: {:?}, movetx hex: {}",
2010                deposit_outpoint,
2011                bitcoin::consensus::encode::serialize_hex(&movetx)
2012            );
2013
2014            // check if transaction is a movetx
2015            if movetx.input.len() != 1 || movetx.output.len() != 2 {
2016                return Err(Status::invalid_argument(
2017                    "Transaction is not a movetx, input or output lengths are not correct",
2018                ));
2019            }
2020            // check output values
2021            // movetx always has 0 sat anchor output
2022            if !(movetx.output[0].value == self.config.protocol_paramset().bridge_amount
2023                && movetx.output[1].value == Amount::from_sat(0))
2024            {
2025                return Err(Status::invalid_argument(format!(
2026                    "Transaction is not a movetx, output sat values are not correct, should be ({}, 0), got ({}, {})",
2027                    self.config.protocol_paramset().bridge_amount,
2028                    movetx.output[0].value,
2029                    movetx.output[1].value,
2030                )));
2031            }
2032            // check output scriptpubkeys
2033            let verifier_keys = self.fetch_verifier_keys().await?;
2034            let nofn_xonly_pk =
2035                bitcoin::XOnlyPublicKey::from_musig2_pks(verifier_keys.clone(), None).map_err(
2036                    |e| {
2037                        Status::internal(format!(
2038                            "Failed to aggregate verifier public keys, err: {e}, pubkeys: {verifier_keys:?}"
2039                        ))
2040                    },
2041                )?;
2042            let nofn_script = Arc::new(CheckSig::new(nofn_xonly_pk));
2043            let security_council_script = Arc::new(Multisig::from_security_council(
2044                self.config.security_council.clone(),
2045            ));
2046
2047            let (addr, _) = create_taproot_address(
2048                &[
2049                    nofn_script.to_script_buf(),
2050                    security_council_script.to_script_buf(),
2051                ],
2052                None,
2053                self.config.protocol_paramset().network,
2054            );
2055            let bridge_script_pubkey = addr.script_pubkey();
2056
2057            if !(movetx.output[1].script_pubkey
2058                == anchor_output(self.config.protocol_paramset().anchor_amount()).script_pubkey
2059                && movetx.output[0].script_pubkey == bridge_script_pubkey)
2060            {
2061                return Err(Status::invalid_argument(
2062                    format!("Transaction is not a movetx, output scriptpubkeys are not correct, expected: (vault: {:?}, anchor: {:?}), got: (vault: {:?}, anchor: {:?})",
2063                    bridge_script_pubkey,
2064                    anchor_output(self.config.protocol_paramset().anchor_amount()).script_pubkey,
2065                    movetx.output[0].script_pubkey,
2066                    movetx.output[1].script_pubkey,
2067                )));
2068            }
2069
2070            let mut dbtx = self.db.begin_transaction().await?;
2071            self.tx_sender
2072                .insert_try_to_send(
2073                    &mut dbtx,
2074                    Some(TxMetadata {
2075                        deposit_outpoint: Some(deposit_outpoint),
2076                        operator_xonly_pk: None,
2077                        round_idx: None,
2078                        kickoff_idx: None,
2079                        tx_type: TransactionType::MoveToVault,
2080                    }),
2081                    &movetx,
2082                    FeePayingType::CPFP,
2083                    None,
2084                    &[],
2085                    &[],
2086                    &[],
2087                    &[],
2088                )
2089                .await
2090                .map_to_status()?;
2091            dbtx.commit()
2092                .await
2093                .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
2094
2095            Ok(Response::new(movetx.compute_txid().into()))
2096        }
2097    }
2098}
2099
2100#[cfg(test)]
2101mod tests {
2102    use crate::actor::Actor;
2103    use crate::config::BridgeConfig;
2104    use crate::deposit::{BaseDepositData, DepositInfo, DepositType};
2105    use crate::musig2::AggregateFromPublicKeys;
2106    use crate::rpc::clementine::clementine_aggregator_client::ClementineAggregatorClient;
2107    use crate::rpc::clementine::{self, GetEntityStatusesRequest, SendMoveTxRequest};
2108    use crate::rpc::get_clients;
2109    use crate::servers::create_aggregator_unix_server;
2110    use crate::test::common::citrea::MockCitreaClient;
2111    use crate::test::common::tx_utils::ensure_tx_onchain;
2112    use crate::test::common::*;
2113    use crate::{builder, EVMAddress};
2114    use bitcoin::hashes::Hash;
2115    use bitcoincore_rpc::RpcApi;
2116    use eyre::Context;
2117    use std::time::Duration;
2118    use tokio::time::sleep;
2119    use tonic::{Request, Status};
2120
2121    #[cfg(feature = "automation")]
2122    async fn perform_deposit(mut config: BridgeConfig) -> Result<(), Status> {
2123        let regtest = create_regtest_rpc(&mut config).await;
2124        let rpc = regtest.rpc();
2125
2126        let actors = create_actors::<MockCitreaClient>(&config).await;
2127        let _unused =
2128            run_single_deposit::<MockCitreaClient>(&mut config, rpc.clone(), None, &actors, None)
2129                .await?;
2130
2131        Ok(())
2132    }
2133
2134    #[tokio::test(flavor = "multi_thread")]
2135    async fn aggregator_double_deposit() {
2136        let mut config = create_test_config_with_thread_name().await;
2137        let regtest = create_regtest_rpc(&mut config).await;
2138        let rpc = regtest.rpc();
2139        let actors = create_actors::<MockCitreaClient>(&config).await;
2140        let mut aggregator = actors.get_aggregator();
2141
2142        let evm_address = EVMAddress([1u8; 20]);
2143        let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2144
2145        let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2146            .setup(tonic::Request::new(clementine::Empty {}))
2147            .await
2148            .unwrap()
2149            .into_inner()
2150            .try_into()
2151            .unwrap();
2152        sleep(Duration::from_secs(3)).await;
2153
2154        let nofn_xonly_pk =
2155            bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2156
2157        let deposit_address = builder::address::generate_deposit_address(
2158            nofn_xonly_pk,
2159            signer.address.as_unchecked(),
2160            evm_address,
2161            config.protocol_paramset().network,
2162            config.protocol_paramset().user_takes_after,
2163        )
2164        .unwrap()
2165        .0;
2166
2167        let deposit_outpoint = rpc
2168            .send_to_address(&deposit_address, config.protocol_paramset().bridge_amount)
2169            .await
2170            .unwrap();
2171        rpc.mine_blocks(18).await.unwrap();
2172
2173        let deposit_info = DepositInfo {
2174            deposit_outpoint,
2175            deposit_type: DepositType::BaseDeposit(BaseDepositData {
2176                evm_address,
2177                recovery_taproot_address: signer.address.as_unchecked().clone(),
2178            }),
2179        };
2180
2181        // Two deposits with the same values.
2182        let movetx_one = aggregator
2183            .new_deposit(clementine::Deposit::from(deposit_info.clone()))
2184            .await
2185            .unwrap()
2186            .into_inner();
2187        let movetx_one_txid: bitcoin::Txid = aggregator
2188            .send_move_to_vault_tx(SendMoveTxRequest {
2189                deposit_outpoint: Some(deposit_outpoint.into()),
2190                raw_tx: Some(movetx_one),
2191            })
2192            .await
2193            .unwrap()
2194            .into_inner()
2195            .try_into()
2196            .unwrap();
2197
2198        let movetx_two = aggregator
2199            .new_deposit(clementine::Deposit::from(deposit_info))
2200            .await
2201            .unwrap()
2202            .into_inner();
2203        let _movetx_two_txid: bitcoin::Txid = aggregator
2204            .send_move_to_vault_tx(SendMoveTxRequest {
2205                deposit_outpoint: Some(deposit_outpoint.into()),
2206                raw_tx: Some(movetx_two),
2207            })
2208            .await
2209            .unwrap()
2210            .into_inner()
2211            .try_into()
2212            .unwrap();
2213        rpc.mine_blocks(1).await.unwrap();
2214        sleep(Duration::from_secs(3)).await;
2215
2216        poll_until_condition(
2217            async || {
2218                rpc.mine_blocks(1).await.unwrap();
2219                Ok(rpc
2220                    .is_tx_on_chain(&movetx_one_txid)
2221                    .await
2222                    .unwrap_or_default())
2223            },
2224            None,
2225            None,
2226        )
2227        .await
2228        .wrap_err_with(|| eyre::eyre!("MoveTx did not land onchain"))
2229        .unwrap();
2230    }
2231
2232    #[tokio::test(flavor = "multi_thread")]
2233    async fn aggregator_deposit_movetx_lands_onchain() {
2234        let mut config = create_test_config_with_thread_name().await;
2235        let regtest = create_regtest_rpc(&mut config).await;
2236        let rpc = regtest.rpc();
2237        let actors = create_actors::<MockCitreaClient>(&config).await;
2238        let mut aggregator = actors.get_aggregator();
2239
2240        let evm_address = EVMAddress([1u8; 20]);
2241        let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2242
2243        let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2244            .setup(tonic::Request::new(clementine::Empty {}))
2245            .await
2246            .unwrap()
2247            .into_inner()
2248            .try_into()
2249            .unwrap();
2250        sleep(Duration::from_secs(3)).await;
2251
2252        let nofn_xonly_pk =
2253            bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2254
2255        let deposit_address = builder::address::generate_deposit_address(
2256            nofn_xonly_pk,
2257            signer.address.as_unchecked(),
2258            evm_address,
2259            config.protocol_paramset().network,
2260            config.protocol_paramset().user_takes_after,
2261        )
2262        .unwrap()
2263        .0;
2264
2265        let deposit_outpoint = rpc
2266            .send_to_address(&deposit_address, config.protocol_paramset().bridge_amount)
2267            .await
2268            .unwrap();
2269        rpc.mine_blocks(18).await.unwrap();
2270
2271        let deposit_info = DepositInfo {
2272            deposit_outpoint,
2273            deposit_type: DepositType::BaseDeposit(BaseDepositData {
2274                evm_address,
2275                recovery_taproot_address: signer.address.as_unchecked().clone(),
2276            }),
2277        };
2278
2279        // Generate and broadcast the move-to-vault transaction
2280        let start_time = std::time::Instant::now();
2281        let raw_move_tx = aggregator
2282            .new_deposit(clementine::Deposit::from(deposit_info))
2283            .await
2284            .unwrap()
2285            .into_inner();
2286        let end_time = std::time::Instant::now();
2287        tracing::info!("New deposit time: {:?}", end_time - start_time);
2288
2289        let movetx_txid = aggregator
2290            .send_move_to_vault_tx(SendMoveTxRequest {
2291                deposit_outpoint: Some(deposit_outpoint.into()),
2292                raw_tx: Some(raw_move_tx),
2293            })
2294            .await
2295            .unwrap()
2296            .into_inner()
2297            .try_into()
2298            .unwrap();
2299
2300        rpc.mine_blocks(1).await.unwrap();
2301        sleep(Duration::from_secs(3)).await;
2302
2303        poll_until_condition(
2304            async || {
2305                rpc.mine_blocks(1).await.unwrap();
2306                Ok(rpc.is_tx_on_chain(&movetx_txid).await.unwrap_or_default())
2307            },
2308            None,
2309            None,
2310        )
2311        .await
2312        .wrap_err_with(|| eyre::eyre!("MoveTx did not land onchain"))
2313        .unwrap();
2314    }
2315
2316    #[tokio::test]
2317    async fn aggregator_two_deposit_movetx_and_emergency_stop() {
2318        let mut config = create_test_config_with_thread_name().await;
2319        let regtest = create_regtest_rpc(&mut config).await;
2320        let rpc = regtest.rpc();
2321        let actors = create_actors::<MockCitreaClient>(&config).await;
2322        let mut aggregator = actors.get_aggregator();
2323
2324        let evm_address = EVMAddress([1u8; 20]);
2325        let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2326
2327        let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2328            .setup(tonic::Request::new(clementine::Empty {}))
2329            .await
2330            .unwrap()
2331            .into_inner()
2332            .try_into()
2333            .unwrap();
2334        sleep(Duration::from_secs(3)).await;
2335
2336        let nofn_xonly_pk =
2337            bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2338
2339        let deposit_address_0 = builder::address::generate_deposit_address(
2340            nofn_xonly_pk,
2341            signer.address.as_unchecked(),
2342            evm_address,
2343            config.protocol_paramset().network,
2344            config.protocol_paramset().user_takes_after,
2345        )
2346        .unwrap()
2347        .0;
2348
2349        let deposit_address_1 = builder::address::generate_deposit_address(
2350            nofn_xonly_pk,
2351            signer.address.as_unchecked(),
2352            evm_address,
2353            config.protocol_paramset().network,
2354            config.protocol_paramset().user_takes_after,
2355        )
2356        .unwrap()
2357        .0;
2358
2359        let deposit_outpoint_0 = rpc
2360            .send_to_address(&deposit_address_0, config.protocol_paramset().bridge_amount)
2361            .await
2362            .unwrap();
2363        rpc.mine_blocks(18).await.unwrap();
2364
2365        let deposit_outpoint_1 = rpc
2366            .send_to_address(&deposit_address_1, config.protocol_paramset().bridge_amount)
2367            .await
2368            .unwrap();
2369        rpc.mine_blocks(18).await.unwrap();
2370
2371        let deposit_info_0 = DepositInfo {
2372            deposit_outpoint: deposit_outpoint_0,
2373            deposit_type: DepositType::BaseDeposit(BaseDepositData {
2374                evm_address,
2375                recovery_taproot_address: signer.address.as_unchecked().clone(),
2376            }),
2377        };
2378
2379        let deposit_info_1 = DepositInfo {
2380            deposit_outpoint: deposit_outpoint_1,
2381            deposit_type: DepositType::BaseDeposit(BaseDepositData {
2382                evm_address,
2383                recovery_taproot_address: signer.address.as_unchecked().clone(),
2384            }),
2385        };
2386
2387        // Generate and broadcast the move-to-vault tx for the first deposit
2388        let raw_move_tx_0 = aggregator
2389            .new_deposit(clementine::Deposit::from(deposit_info_0))
2390            .await
2391            .unwrap()
2392            .into_inner();
2393        let move_txid_0: bitcoin::Txid = aggregator
2394            .send_move_to_vault_tx(SendMoveTxRequest {
2395                deposit_outpoint: Some(deposit_outpoint_0.into()),
2396                raw_tx: Some(raw_move_tx_0),
2397            })
2398            .await
2399            .unwrap()
2400            .into_inner()
2401            .try_into()
2402            .unwrap();
2403
2404        rpc.mine_blocks(1).await.unwrap();
2405        sleep(Duration::from_secs(3)).await;
2406        ensure_tx_onchain(rpc, move_txid_0)
2407            .await
2408            .expect("failed to get movetx_0 on chain");
2409
2410        // Generate and broadcast the move-to-vault tx for the second deposit
2411        let raw_move_tx_1 = aggregator
2412            .new_deposit(clementine::Deposit::from(deposit_info_1))
2413            .await
2414            .unwrap()
2415            .into_inner();
2416        let move_txid_1 = aggregator
2417            .send_move_to_vault_tx(SendMoveTxRequest {
2418                deposit_outpoint: Some(deposit_outpoint_1.into()),
2419                raw_tx: Some(raw_move_tx_1),
2420            })
2421            .await
2422            .unwrap()
2423            .into_inner()
2424            .try_into()
2425            .unwrap();
2426
2427        rpc.mine_blocks(1).await.unwrap();
2428        ensure_tx_onchain(rpc, move_txid_1)
2429            .await
2430            .expect("failed to get movetx_1 on chain");
2431        sleep(Duration::from_secs(3)).await;
2432
2433        let move_txids = vec![move_txid_0, move_txid_1];
2434
2435        tracing::debug!("Move txids: {:?}", move_txids);
2436
2437        let emergency_txid = aggregator
2438            .internal_get_emergency_stop_tx(tonic::Request::new(
2439                clementine::GetEmergencyStopTxRequest {
2440                    txids: move_txids
2441                        .iter()
2442                        .map(|txid| clementine::Txid {
2443                            txid: txid.to_byte_array().to_vec(),
2444                        })
2445                        .collect(),
2446                },
2447            ))
2448            .await
2449            .unwrap()
2450            .into_inner();
2451
2452        let decryption_priv_key =
2453            hex::decode("a80bc8cf095c2b37d4c6233114e0dd91f43d75de5602466232dbfcc1fc66c542")
2454                .expect("Failed to parse emergency stop encryption public key");
2455        let emergency_stop_tx: bitcoin::Transaction = bitcoin::consensus::deserialize(
2456            &crate::encryption::decrypt_bytes(
2457                &decryption_priv_key,
2458                &emergency_txid.encrypted_emergency_stop_txs[0],
2459            )
2460            .expect("Failed to decrypt emergency stop tx"),
2461        )
2462        .expect("Failed to deserialize");
2463
2464        rpc.send_raw_transaction(&emergency_stop_tx)
2465            .await
2466            .expect("Failed to send emergency stop tx");
2467
2468        let emergency_stop_txid = emergency_stop_tx.compute_txid();
2469        rpc.mine_blocks(1).await.unwrap();
2470
2471        poll_until_condition(
2472            async || {
2473                rpc.mine_blocks(1).await.unwrap();
2474                Ok(rpc
2475                    .is_tx_on_chain(&emergency_stop_txid)
2476                    .await
2477                    .unwrap_or_default())
2478            },
2479            None,
2480            None,
2481        )
2482        .await
2483        .wrap_err_with(|| eyre::eyre!("Emergency stop tx did not land onchain"))
2484        .unwrap();
2485    }
2486
2487    #[cfg(feature = "automation")]
2488    #[tokio::test]
2489    #[ignore = "This test does not work"]
2490    async fn aggregator_deposit_finalize_verifier_timeout() {
2491        let mut config = create_test_config_with_thread_name().await;
2492        config
2493            .test_params
2494            .timeout_params
2495            .deposit_finalize_verifier_idx = Some(0);
2496        let res = perform_deposit(config).await;
2497        assert!(res.is_err());
2498        let err_string = res.unwrap_err().to_string();
2499        assert!(
2500            err_string.contains("Deposit finalization from verifiers"),
2501            "Error string was: {err_string}"
2502        );
2503    }
2504
2505    #[cfg(feature = "automation")]
2506    #[tokio::test]
2507    async fn aggregator_deposit_key_distribution_verifier_timeout() {
2508        let mut config = create_test_config_with_thread_name().await;
2509        config
2510            .test_params
2511            .timeout_params
2512            .key_distribution_verifier_idx = Some(0);
2513
2514        let res = perform_deposit(config).await;
2515
2516        assert!(res.is_err());
2517        let err_string = res.unwrap_err().to_string();
2518        assert!(
2519            err_string.contains("Verifier key distribution (id:"),
2520            "Error string was: {err_string}"
2521        );
2522    }
2523
2524    #[cfg(feature = "automation")]
2525    #[tokio::test]
2526    async fn aggregator_deposit_key_distribution_operator_timeout() {
2527        let mut config = create_test_config_with_thread_name().await;
2528        config
2529            .test_params
2530            .timeout_params
2531            .key_collection_operator_idx = Some(0);
2532
2533        let res = perform_deposit(config).await;
2534
2535        assert!(res.is_err());
2536        let err_string = res.unwrap_err().to_string();
2537        assert!(
2538            err_string.contains("Operator key collection (id:"),
2539            "Error string was: {err_string}"
2540        );
2541    }
2542
2543    #[cfg(feature = "automation")]
2544    #[tokio::test]
2545    async fn aggregator_deposit_nonce_stream_creation_verifier_timeout() {
2546        let mut config = create_test_config_with_thread_name().await;
2547        config
2548            .test_params
2549            .timeout_params
2550            .nonce_stream_creation_verifier_idx = Some(0);
2551
2552        let res = perform_deposit(config).await;
2553
2554        assert!(res.is_err());
2555        let err_string = res.unwrap_err().to_string();
2556        assert!(
2557            err_string.contains("Nonce stream creation (id:"),
2558            "Error string was: {err_string}"
2559        );
2560    }
2561
2562    #[cfg(feature = "automation")]
2563    #[tokio::test]
2564    async fn aggregator_deposit_partial_sig_stream_creation_timeout() {
2565        let mut config = create_test_config_with_thread_name().await;
2566        config
2567            .test_params
2568            .timeout_params
2569            .partial_sig_stream_creation_verifier_idx = Some(0);
2570
2571        let res = perform_deposit(config).await;
2572
2573        assert!(res.is_err());
2574        let err_string = res.unwrap_err().to_string();
2575        assert!(
2576            err_string.contains("Partial signature stream creation (id:"),
2577            "Error string was: {err_string}"
2578        );
2579    }
2580
2581    #[cfg(feature = "automation")]
2582    #[tokio::test]
2583    async fn aggregator_deposit_operator_sig_collection_operator_timeout() {
2584        let mut config = create_test_config_with_thread_name().await;
2585        config
2586            .test_params
2587            .timeout_params
2588            .operator_sig_collection_operator_idx = Some(0);
2589
2590        let res = perform_deposit(config).await;
2591
2592        assert!(res.is_err());
2593        let err_string = res.unwrap_err().to_string();
2594        assert!(
2595            err_string.contains("Operator signature stream creation (id:"),
2596            "Error string was: {err_string}"
2597        );
2598    }
2599
2600    #[tokio::test]
2601    async fn aggregator_get_entity_statuses() {
2602        let mut config = create_test_config_with_thread_name().await;
2603        let _regtest = create_regtest_rpc(&mut config).await;
2604
2605        let actors = create_actors::<MockCitreaClient>(&config).await;
2606        let mut aggregator = actors.get_aggregator();
2607        let status = aggregator
2608            .get_entity_statuses(Request::new(GetEntityStatusesRequest {
2609                restart_tasks: false,
2610            }))
2611            .await
2612            .unwrap()
2613            .into_inner();
2614
2615        tracing::info!("Status: {:?}", status);
2616
2617        assert_eq!(
2618            status.entity_statuses.len(),
2619            config.test_params.all_operators_secret_keys.len()
2620                + config.test_params.all_verifiers_secret_keys.len()
2621        );
2622    }
2623
2624    #[tokio::test]
2625    async fn aggregator_start_with_offline_verifier() {
2626        let mut config = create_test_config_with_thread_name().await;
2627        // Create regtest rpc
2628        let _regtest = create_regtest_rpc(&mut config).await;
2629        // random ips
2630        config.verifier_endpoints = Some(vec!["https://142.143.144.145:17001".to_string()]);
2631        config.operator_endpoints = Some(vec!["https://142.143.144.145:17002".to_string()]);
2632        // Create temporary directory for aggregator socket
2633        let socket_dir = tempfile::tempdir().unwrap();
2634        let socket_path = socket_dir.path().join("aggregator.sock");
2635
2636        tracing::info!("Creating unix aggregator server");
2637
2638        let (_, _shutdown_tx) = create_aggregator_unix_server(config.clone(), socket_path.clone())
2639            .await
2640            .unwrap();
2641
2642        tracing::info!("Created unix aggregator server");
2643
2644        let mut aggregator_client = get_clients(
2645            vec![format!("unix://{}", socket_path.display())],
2646            ClementineAggregatorClient::new,
2647            &config,
2648            false,
2649        )
2650        .await
2651        .unwrap()
2652        .pop()
2653        .unwrap();
2654
2655        tracing::info!("Got aggregator client");
2656
2657        // vergen should work
2658        assert!(aggregator_client
2659            .vergen(Request::new(clementine::Empty {}))
2660            .await
2661            .is_ok());
2662
2663        tracing::info!("After vergen");
2664
2665        // setup should give error as it can't connect to the verifier
2666        assert!(aggregator_client
2667            .setup(Request::new(clementine::Empty {}))
2668            .await
2669            .is_err());
2670
2671        tracing::info!("After setup");
2672
2673        // aggregator should still be up even after not connecting to the verifier
2674        // and should be able to get metrics
2675        tracing::info!(
2676            "Entity statuses: {:?}",
2677            aggregator_client
2678                .get_entity_statuses(Request::new(GetEntityStatusesRequest {
2679                    restart_tasks: false,
2680                }))
2681                .await
2682                .unwrap()
2683        );
2684    }
2685}