clementine_core/rpc/
aggregator.rs

1use super::clementine::{
2    clementine_aggregator_server::ClementineAggregator, verifier_deposit_finalize_params,
3    DepositParams, Empty, VerifierDepositFinalizeParams,
4};
5use super::clementine::{
6    AggregatorWithdrawResponse, Deposit, EntityStatuses, GetEntityStatusesRequest,
7    OptimisticPayoutParams, RawSignedTx, VergenResponse, VerifierPublicKeys,
8};
9use crate::aggregator::{
10    AggregatorServer, CompatibilityCheckScope, OperatorId, ParticipatingOperators,
11    ParticipatingVerifiers, VerifierId,
12};
13use crate::bitvm_client::SECP;
14use crate::builder::sighash::SignatureInfo;
15use crate::builder::transaction::{
16    create_emergency_stop_txhandler, create_move_to_vault_txhandler,
17    create_optimistic_payout_txhandler, Signed, TxHandler,
18};
19use crate::compatibility::ActorWithConfig;
20use crate::config::BridgeConfig;
21use crate::constants::{
22    DEPOSIT_FINALIZATION_TIMEOUT, DEPOSIT_FINALIZE_STREAM_CREATION_TIMEOUT,
23    KEY_DISTRIBUTION_TIMEOUT, NONCE_STREAM_CREATION_TIMEOUT, OPERATOR_SIGS_STREAM_CREATION_TIMEOUT,
24    OPERATOR_SIGS_TIMEOUT, OPTIMISTIC_PAYOUT_TIMEOUT, OVERALL_DEPOSIT_TIMEOUT,
25    PARTIAL_SIG_STREAM_CREATION_TIMEOUT, PIPELINE_COMPLETION_TIMEOUT, SEND_OPERATOR_SIGS_TIMEOUT,
26    SETUP_COMPLETION_TIMEOUT, WITHDRAWAL_TIMEOUT,
27};
28use crate::deposit::{Actors, DepositData, DepositInfo};
29use crate::musig2::AggregateFromPublicKeys;
30use crate::rpc::clementine::{
31    operator_withrawal_response, AggregatorWithdrawalInput, CompatibilityParamsRpc,
32    EntitiesCompatibilityData, OperatorWithrawalResponse, VerifierDepositSignParams,
33};
34use crate::rpc::parser;
35use crate::utils::{
36    flatten_join_named_results, get_vergen_response, timed_request, timed_try_join_all,
37    try_join_all_combine_errors, ScriptBufExt,
38};
39use crate::utils::{FeePayingType, TxMetadata};
40use crate::{
41    aggregator::Aggregator,
42    builder::sighash::create_nofn_sighash_stream,
43    musig2::aggregate_nonces,
44    rpc::clementine::{self, DepositSignSession},
45};
46use bitcoin::hashes::Hash;
47use bitcoin::secp256k1::schnorr::Signature;
48use bitcoin::secp256k1::{Message, PublicKey};
49use bitcoin::{TapSighash, TxOut, Txid, XOnlyPublicKey};
50use clementine_errors::BridgeError;
51use clementine_errors::TransactionType;
52use clementine_errors::{ErrorExt, ResultExt};
53use clementine_primitives::UTXO;
54use eyre::{Context, OptionExt};
55use futures::future::join_all;
56use futures::{
57    stream::{BoxStream, TryStreamExt},
58    FutureExt, Stream, StreamExt, TryFutureExt,
59};
60use secp256k1::musig::{AggregatedNonce, PartialSignature, PublicNonce};
61use std::future::Future;
62use tokio::sync::mpsc::{channel, Receiver, Sender};
63use tonic::{async_trait, Request, Response, Status, Streaming};
64struct AggNonceQueueItem {
65    agg_nonce: AggregatedNonce,
66    sighash: TapSighash,
67}
68
69struct FinalSigQueueItem {
70    final_sig: Vec<u8>,
71}
72
73use clementine_errors::AggregatorError;
74
75async fn get_next_pub_nonces(
76    nonce_streams: &mut [impl Stream<Item = Result<PublicNonce, BridgeError>>
77              + Unpin
78              + Send
79              + 'static],
80    verifiers_ids: &[VerifierId],
81) -> Result<Vec<PublicNonce>, BridgeError> {
82    try_join_all_combine_errors(nonce_streams.iter_mut().zip(verifiers_ids).map(
83        |(s, id)| async move {
84            s.next()
85                .await
86                .transpose()
87                .wrap_err(format!("Failed to get nonce from {id}"))? // Return the inner error if it exists
88                .ok_or_else(|| -> eyre::Report {
89                    AggregatorError::InputStreamEndedEarlyUnknownSize {
90                        // Return an early end error if the stream is empty
91                        stream_name: format!("Nonce stream {id}"),
92                    }
93                    .into()
94                })
95        },
96    ))
97    .await
98}
99
100/// For each expected sighash, we collect a batch of public nonces from all verifiers. We aggregate and send aggregated nonce and all public nonces (needed for partial signature verification) to the agg_nonce_sender. Then repeat for the next sighash.
101async fn nonce_aggregator(
102    mut nonce_streams: Vec<
103        impl Stream<Item = Result<PublicNonce, BridgeError>> + Unpin + Send + 'static,
104    >,
105    mut sighash_stream: impl Stream<Item = Result<(TapSighash, SignatureInfo), BridgeError>>
106        + Unpin
107        + Send
108        + 'static,
109    agg_nonce_sender: Sender<(AggNonceQueueItem, Vec<PublicNonce>)>,
110    needed_nofn_sigs: usize,
111    verifiers_ids: Vec<VerifierId>,
112) -> Result<
113    (
114        (AggregatedNonce, Vec<PublicNonce>),
115        (AggregatedNonce, Vec<PublicNonce>),
116    ),
117    BridgeError,
118> {
119    let mut total_sigs = 0;
120
121    tracing::info!("Starting nonce aggregation (expecting {needed_nofn_sigs} nonces)");
122
123    // sanity check
124    if verifiers_ids.len() != nonce_streams.len() {
125        return Err(
126            eyre::eyre!("Number of verifiers ids and nonce streams must be the same").into(),
127        );
128    }
129
130    while let Some(msg) = sighash_stream.next().await {
131        let (sighash, siginfo) = msg.wrap_err("Sighash stream failed")?;
132
133        total_sigs += 1;
134
135        let pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
136            .await
137            .wrap_err_with(|| {
138                format!("Failed to get nonces from verifiers for sighash #{total_sigs} with siginfo: {siginfo:?}")
139            })?;
140
141        tracing::trace!(
142            "Received nonces for signature id {:?} in nonce_aggregator",
143            siginfo.signature_id
144        );
145
146        let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::<Vec<_>>().as_slice())?;
147
148        agg_nonce_sender
149            .send((AggNonceQueueItem { agg_nonce, sighash }, pub_nonces))
150            .await
151            .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
152                stream_name: "nonce_aggregator".to_string(),
153            })?;
154
155        tracing::trace!(
156            "Sent nonces for signature id {:?} in nonce_aggregator",
157            siginfo.signature_id
158        );
159    }
160    tracing::trace!(tmp_debug = 1, "Sent {total_sigs} to agg_nonce stream");
161
162    // Sanity check, should never happen
163    if total_sigs != needed_nofn_sigs {
164        let err_msg = format!(
165            "Expected {needed_nofn_sigs} nofn signatures, got {total_sigs} from sighash stream",
166        );
167        tracing::error!("{err_msg}");
168        return Err(eyre::eyre!(err_msg).into());
169    }
170    // aggregate nonces for the movetx signature
171    let movetx_pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
172        .await
173        .wrap_err("Failed to get movetx public nonces from verifiers")?;
174
175    tracing::trace!("Received nonces for movetx in nonce_aggregator");
176
177    let move_tx_agg_nonce =
178        aggregate_nonces(movetx_pub_nonces.iter().collect::<Vec<_>>().as_slice())
179            .wrap_err("Failed to aggregate movetx nonces")?;
180
181    let emergency_stop_pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
182        .await
183        .wrap_err("Failed to get emergency stop tx public nonces from verifiers")?;
184
185    let emergency_stop_agg_nonce = aggregate_nonces(
186        emergency_stop_pub_nonces
187            .iter()
188            .collect::<Vec<_>>()
189            .as_slice(),
190    )
191    .wrap_err("Failed to aggregate emergency stop tx nonces")?;
192
193    Ok((
194        (move_tx_agg_nonce, movetx_pub_nonces),
195        (emergency_stop_agg_nonce, emergency_stop_pub_nonces),
196    ))
197}
198
199/// Reroutes aggregated nonces and public nonces for each aggregated nonce to the signature aggregator.
200async fn nonce_distributor(
201    mut agg_nonce_receiver: Receiver<(AggNonceQueueItem, Vec<PublicNonce>)>,
202    partial_sig_streams: Vec<(
203        Streaming<clementine::PartialSig>,
204        Sender<clementine::VerifierDepositSignParams>,
205    )>,
206    partial_sig_sender: Sender<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>,
207    needed_nofn_sigs: usize,
208    verifiers_ids: Vec<VerifierId>,
209) -> Result<(), BridgeError> {
210    let mut nonce_count = 0;
211    let mut sig_count = 0;
212    let (mut partial_sig_rx, mut partial_sig_tx): (Vec<_>, Vec<_>) =
213        partial_sig_streams.into_iter().unzip();
214
215    let (queue_tx, mut queue_rx) = channel(crate::constants::DEFAULT_CHANNEL_SIZE);
216
217    // sanity check
218    if verifiers_ids.len() != partial_sig_rx.len() {
219        return Err(eyre::eyre!(
220            "Number of verifiers ids and partial sig streams must be the same"
221        )
222        .into());
223    }
224    let verifiers_ids_clone = verifiers_ids.clone();
225    let handle_1 = tokio::spawn(async move {
226        while let Some((queue_item, pub_nonces)) = agg_nonce_receiver.recv().await {
227            nonce_count += 1;
228
229            tracing::trace!(
230                "Received aggregated nonce {} in nonce_distributor",
231                nonce_count
232            );
233
234            let agg_nonce_wrapped = clementine::VerifierDepositSignParams {
235                params: Some(clementine::verifier_deposit_sign_params::Params::AggNonce(
236                    queue_item.agg_nonce.serialize().to_vec(),
237                )),
238            };
239
240            // Broadcast aggregated nonce to all streams
241            try_join_all_combine_errors(
242                partial_sig_tx
243                    .iter_mut()
244                    .zip(verifiers_ids_clone.iter())
245                    .map(|(tx, id)| {
246                        let agg_nonce_wrapped = agg_nonce_wrapped.clone();
247                        async move {
248                            tx.send(agg_nonce_wrapped)
249                                .await
250                                .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
251                                    stream_name: format!("Partial sig {id}"),
252                                })
253                                .inspect_err(|e| {
254                                    tracing::error!(
255                                        "Failed to send aggregated nonce to {id}: {:?}",
256                                        e
257                                    );
258                                })
259                        }
260                    }),
261            )
262            .await
263            .wrap_err("Failed to send aggregated nonces to verifiers")?;
264
265            queue_tx
266                .send((queue_item, pub_nonces))
267                .await
268                .wrap_err("Other end of channel closed")?;
269
270            tracing::trace!(
271                "Sent aggregated nonce {} to verifiers in nonce_distributor",
272                nonce_count
273            );
274            if nonce_count == needed_nofn_sigs {
275                break;
276            }
277        }
278        if nonce_count != needed_nofn_sigs {
279            let err_msg = format!("Expected {needed_nofn_sigs} aggregated nonces in nonce_distributor, got {nonce_count}",);
280            tracing::error!("{err_msg}");
281            return Err(eyre::eyre!(err_msg).into());
282        }
283
284        tracing::trace!(
285            tmp_debug = 1,
286            "Broadcasted {nonce_count} agg_nonces to verifiers and to the queue"
287        );
288        Ok::<(), BridgeError>(())
289    });
290
291    let handle_2 = tokio::spawn(async move {
292        while let Some((queue_item, pub_nonces)) = queue_rx.recv().await {
293            let pub_nonces_ref = pub_nonces.as_slice();
294            if pub_nonces_ref.len() != partial_sig_rx.len() {
295                return Err(eyre::eyre!(
296                    "Number of public nonces {} and partial sig streams {} must be the same",
297                    pub_nonces_ref.len(),
298                    partial_sig_rx.len()
299                )
300                .into());
301            }
302            let partial_sigs = try_join_all_combine_errors(partial_sig_rx.iter_mut().zip(pub_nonces_ref.iter()).zip(verifiers_ids.iter()).map(
303                |((stream, pub_nonce), id)| async move {
304                    let partial_sig = stream
305                        .message()
306                        .await
307                        .wrap_err_with(|| AggregatorError::RequestFailed {
308                            request_name: format!("Partial sig {sig_count} from {id}"),
309                        })
310                        .inspect_err(|e| {
311                            tracing::error!(
312                                "Failed to receive partial signature {sig_count} from {id}, an error was sent: {:?}",
313                                e
314                            );
315                        })?
316                        .ok_or_eyre(AggregatorError::InputStreamEndedEarlyUnknownSize {
317                            stream_name: format!("Partial sig {sig_count} from {id} closed"),
318                        }).inspect_err(|e| {
319                            tracing::error!(
320                                "Failed to receive partial signature {sig_count} from {id}, the stream was closed: {:?}",
321                                e
322                            );
323                        })?;
324                    let partial_sig = PartialSignature::from_byte_array(
325                        &partial_sig
326                            .partial_sig
327                            .as_slice()
328                            .try_into()
329                            .wrap_err("PartialSignature must be 32 bytes")?,
330                    )
331                    .wrap_err(format!("Failed to parse partial signature {sig_count} from {id}"))?;
332
333                    Ok::<_, BridgeError>((partial_sig, *pub_nonce))
334                },
335            ))
336            .await?;
337
338            sig_count += 1;
339
340            tracing::trace!(
341                "Received partial signature {} from verifiers in nonce_distributor",
342                sig_count
343            );
344
345            partial_sig_sender
346                .send((partial_sigs, queue_item))
347                .await
348                .map_err(|_| {
349                    eyre::eyre!(AggregatorError::OutputStreamEndedEarly {
350                        stream_name: "partial_sig_sender".into(),
351                    })
352                })?;
353
354            tracing::trace!(
355                "Sent partial signature {} to signature_aggregator in nonce_distributor",
356                sig_count
357            );
358        }
359
360        if sig_count != needed_nofn_sigs {
361            let err_msg = format!(
362                "Expected {needed_nofn_sigs} partial signatures in nonce_distributor, got {sig_count}",
363            );
364            tracing::error!("{err_msg}");
365            return Err(eyre::eyre!(err_msg).into());
366        }
367        tracing::trace!(
368            tmp_debug = 1,
369            "Sent {sig_count} partial sig bundles to partial_sigs stream"
370        );
371
372        tracing::trace!("Finished tasks in nonce_distributor handle 2");
373        Ok::<(), BridgeError>(())
374    });
375
376    let (result_1, result_2) = tokio::join!(handle_1, handle_2);
377
378    let mut task_errors = Vec::new();
379
380    match result_1 {
381        Ok(inner_result) => {
382            if let Err(e) = inner_result {
383                task_errors.push(format!(
384                    "Task returned error while distributing aggnonces: {e:#?}"
385                ));
386            }
387        }
388        Err(e) => {
389            task_errors.push(format!(
390                "Task panicked while distributing aggnonces: {e:#?}"
391            ));
392        }
393    }
394
395    match result_2 {
396        Ok(inner_result) => {
397            if let Err(e) = inner_result {
398                task_errors.push(format!(
399                    "Task returned error while receiving partial sigs: {e:#?}"
400                ));
401            }
402        }
403        Err(e) => {
404            task_errors.push(format!(
405                "Task panicked while receiving partial sigs: {e:#?}"
406            ));
407        }
408    }
409
410    if !task_errors.is_empty() {
411        return Err(eyre::eyre!(format!(
412            "nonce_distributor failed with errors: {:#?}",
413            task_errors
414        ))
415        .into());
416    }
417
418    tracing::debug!("Finished tasks in nonce_distributor");
419
420    Ok(())
421}
422
423/// Collects partial signatures and corresponding public nonces from given stream and aggregates them.
424/// Each partial signature will also be verified if PARTIAL_SIG_VERIFICATION is set to true.
425async fn signature_aggregator(
426    mut partial_sig_receiver: Receiver<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>,
427    verifiers_public_keys: Vec<PublicKey>,
428    final_sig_sender: Sender<FinalSigQueueItem>,
429    needed_nofn_sigs: usize,
430) -> Result<(), BridgeError> {
431    let mut sig_count = 0;
432    while let Some((partial_sigs, queue_item)) = partial_sig_receiver.recv().await {
433        sig_count += 1;
434        tracing::trace!(
435            "Received partial signatures {} in signature_aggregator",
436            sig_count
437        );
438
439        let final_sig = crate::musig2::aggregate_partial_signatures(
440            verifiers_public_keys.clone(),
441            None,
442            queue_item.agg_nonce,
443            &partial_sigs,
444            Message::from_digest(queue_item.sighash.to_byte_array()),
445        )?;
446
447        final_sig_sender
448            .send(FinalSigQueueItem {
449                final_sig: final_sig.serialize().to_vec(),
450            })
451            .await
452            .wrap_err_with(|| {
453                eyre::eyre!(AggregatorError::OutputStreamEndedEarly {
454                    stream_name: "final_sig_sender".into(),
455                })
456            })?;
457        tracing::trace!(
458            "Sent aggregated signature {} to signature_distributor in signature_aggregator",
459            sig_count
460        );
461
462        if sig_count == needed_nofn_sigs {
463            break;
464        }
465    }
466
467    if sig_count != needed_nofn_sigs {
468        let err_msg = format!(
469            "Expected {needed_nofn_sigs} aggregated signatures in signature_aggregator, got {sig_count}",
470        );
471        tracing::error!("{err_msg}");
472        return Err(eyre::eyre!(err_msg).into());
473    }
474
475    tracing::trace!(
476        tmp_debug = 1,
477        "Sent {sig_count} aggregated signatures to final_sig stream"
478    );
479
480    Ok(())
481}
482
483/// Reroutes aggregated signatures to the caller.
484/// Also sends 2 aggregated nonces to the verifiers.
485async fn signature_distributor(
486    mut final_sig_receiver: Receiver<FinalSigQueueItem>,
487    deposit_finalize_sender: Vec<Sender<VerifierDepositFinalizeParams>>,
488    agg_nonce: impl Future<
489        Output = Result<
490            (
491                (AggregatedNonce, Vec<PublicNonce>),
492                (AggregatedNonce, Vec<PublicNonce>),
493            ),
494            Status,
495        >,
496    >,
497    needed_nofn_sigs: usize,
498    verifiers_ids: Vec<VerifierId>,
499) -> Result<(), BridgeError> {
500    use verifier_deposit_finalize_params::Params;
501    let mut sig_count = 0;
502    while let Some(queue_item) = final_sig_receiver.recv().await {
503        sig_count += 1;
504        tracing::trace!("Received signature {} in signature_distributor", sig_count);
505        let final_params = VerifierDepositFinalizeParams {
506            params: Some(Params::SchnorrSig(queue_item.final_sig)),
507        };
508
509        try_join_all_combine_errors(
510            deposit_finalize_sender
511                .iter()
512                .zip(verifiers_ids.iter())
513                .map(|(tx, id)| {
514                    let final_params = final_params.clone();
515                    async move {
516                        tx.send(final_params).await.wrap_err_with(|| {
517                            AggregatorError::OutputStreamEndedEarly {
518                                stream_name: format!("Deposit finalize sender for {id}"),
519                            }
520                        })
521                    }
522                }),
523        )
524        .await
525        .wrap_err(format!(
526            "Failed to send final signature {sig_count} to verifiers"
527        ))?;
528
529        tracing::trace!(
530            "Sent signature {} to verifiers in signature_distributor",
531            sig_count
532        );
533
534        if sig_count == needed_nofn_sigs {
535            break;
536        }
537    }
538
539    if sig_count != needed_nofn_sigs {
540        let err_msg = format!(
541            "Expected {needed_nofn_sigs} signatures in signature_distributor, got {sig_count}",
542        );
543        tracing::error!("{err_msg}");
544        return Err(eyre::eyre!(err_msg).into());
545    }
546
547    tracing::trace!(
548        tmp_debug = 1,
549        "Sent {sig_count} signatures to verifiers in deposit_finalize"
550    );
551
552    let (movetx_agg_nonce, emergency_stop_agg_nonce) = agg_nonce
553        .await
554        .wrap_err("Failed to get aggregated nonce for movetx and emergency stop")?;
555
556    tracing::info!("Got aggregated nonce for movetx and emergency stop in signature distributor");
557
558    // Send the movetx agg nonce to the verifiers.
559    for tx in &deposit_finalize_sender {
560        tx.send(VerifierDepositFinalizeParams {
561            params: Some(Params::MoveTxAggNonce(
562                movetx_agg_nonce.0.serialize().to_vec(),
563            )),
564        })
565        .await
566        .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
567            stream_name: "Deposit finalize sender (for movetx agg nonce)".to_string(),
568        })?;
569    }
570    tracing::info!("Sent movetx aggregated nonce to verifiers in signature distributor");
571
572    // send emergency stop agg nonce to verifiers
573    for tx in &deposit_finalize_sender {
574        tx.send(VerifierDepositFinalizeParams {
575            params: Some(Params::EmergencyStopAggNonce(
576                emergency_stop_agg_nonce.0.serialize().to_vec(),
577            )),
578        })
579        .await
580        .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
581            stream_name: "Deposit finalize sender (for emergency stop agg nonce)".to_string(),
582        })?;
583    }
584    tracing::info!("Sent emergency stop aggregated nonce to verifiers in signature distributor");
585
586    Ok(())
587}
588
589/// Creates a stream of nonces from verifiers.
590/// This will automatically get the first response from the verifiers.
591///
592/// # Returns
593///
594/// - Vec<[`clementine::NonceGenFirstResponse`]>: First response from each verifier
595/// - Vec<BoxStream<Result<[`PublicNonce`], BridgeError>>>: Stream of nonces from each verifier
596async fn create_nonce_streams(
597    verifiers: ParticipatingVerifiers,
598    num_nonces: u32,
599    #[cfg(test)] config: &crate::config::BridgeConfig,
600) -> Result<
601    (
602        Vec<clementine::NonceGenFirstResponse>,
603        Vec<BoxStream<'static, Result<PublicNonce, BridgeError>>>,
604    ),
605    BridgeError,
606> {
607    let mut nonce_streams = timed_try_join_all(
608        NONCE_STREAM_CREATION_TIMEOUT,
609        "Nonce stream creation",
610        Some(verifiers.ids()),
611        verifiers
612            .clients()
613            .into_iter()
614            .enumerate()
615            .map(|(idx, client)| {
616                let mut client = client.clone();
617                #[cfg(test)]
618                let config = config.clone();
619
620                async move {
621                    #[cfg(test)]
622                    config
623                        .test_params
624                        .timeout_params
625                        .hook_timeout_nonce_stream_creation_verifier(idx)
626                        .await;
627                    let response_stream = client
628                        .nonce_gen(tonic::Request::new(clementine::NonceGenRequest {
629                            num_nonces,
630                        }))
631                        .await
632                        .wrap_err_with(|| AggregatorError::RequestFailed {
633                            request_name: format!("Nonce gen stream for verifier {idx}"),
634                        })?;
635
636                    Ok::<_, BridgeError>(response_stream.into_inner())
637                }
638            }),
639    )
640    .await?;
641
642    // Get the first responses from verifiers.
643    let first_responses: Vec<clementine::NonceGenFirstResponse> =
644        try_join_all_combine_errors(nonce_streams.iter_mut().zip(verifiers.ids()).map(
645            |(stream, id)| async move {
646                parser::verifier::parse_nonce_gen_first_response(stream)
647                    .await
648                    .wrap_err_with(|| format!("Failed to get initial response from {id}"))
649            },
650        ))
651        .await
652        .wrap_err("Failed to get nonce gen's initial responses from verifiers")?;
653
654    let transformed_streams = nonce_streams
655        .into_iter()
656        .zip(verifiers.ids())
657        .map(|(stream, id)| {
658            stream
659                .map(move |result| {
660                    Aggregator::extract_pub_nonce(
661                        result
662                            .wrap_err_with(|| AggregatorError::InputStreamEndedEarlyUnknownSize {
663                                stream_name: format!("Nonce gen stream for {id}"),
664                            })?
665                            .response,
666                    )
667                })
668                .boxed()
669        })
670        .collect::<Vec<_>>();
671
672    Ok((first_responses, transformed_streams))
673}
674
675/// Use items collected from the broadcast receiver for an async function call.
676///
677/// Handles the boilerplate of managing a receiver of a broadcast channel.
678/// If receiver is lagged at any time (data is lost) an error is returned.
679async fn collect_and_call<R, T, F, Fut>(
680    rx: &mut tokio::sync::broadcast::Receiver<Vec<T>>,
681    mut f: F,
682) -> Result<R, Status>
683where
684    R: Default,
685    T: Clone,
686    F: FnMut(Vec<T>) -> Fut,
687    Fut: Future<Output = Result<R, Status>>,
688{
689    loop {
690        match rx.recv().await {
691            Ok(params) => {
692                f(params).await?;
693            }
694            Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
695                break Err(Status::internal(format!(
696                    "lost {n} items due to lagging receiver"
697                )));
698            }
699            Err(tokio::sync::broadcast::error::RecvError::Closed) => break Ok(R::default()),
700        }
701    }
702}
703
704impl Aggregator {
705    // Extracts pub_nonce from given stream.
706    fn extract_pub_nonce(
707        response: Option<clementine::nonce_gen_response::Response>,
708    ) -> Result<PublicNonce, BridgeError> {
709        match response.ok_or_eyre("NonceGen response is empty")? {
710            clementine::nonce_gen_response::Response::PubNonce(pub_nonce) => {
711                Ok(PublicNonce::from_byte_array(
712                    &pub_nonce
713                        .as_slice()
714                        .try_into()
715                        .wrap_err("PubNonce must be 66 bytes")?,
716                )
717                .wrap_err("Failed to parse pub nonce")?)
718            }
719            _ => Err(eyre::eyre!("Expected PubNonce in response").into()),
720        }
721    }
722
723    /// For a specific deposit, collects needed signatures from all operators into a [`Vec<Vec<Signature>>`].
724    async fn collect_operator_sigs(
725        operator_clients: ParticipatingOperators,
726        config: BridgeConfig,
727        mut deposit_sign_session: DepositSignSession,
728    ) -> Result<Vec<Vec<Signature>>, BridgeError> {
729        deposit_sign_session.nonce_gen_first_responses = Vec::new(); // not needed for operators
730        let mut operator_sigs_streams =
731            // create deposit sign streams with each operator
732            timed_try_join_all(
733                OPERATOR_SIGS_STREAM_CREATION_TIMEOUT,
734                "Operator signature stream creation",
735                Some(operator_clients.ids()),
736                operator_clients.clients().into_iter().enumerate().map(|(idx, mut operator_client)| {
737                let sign_session = deposit_sign_session.clone();
738                #[cfg(test)]
739                let config = config.clone();
740                async move {
741                    #[cfg(test)]
742                    config
743                        .test_params
744                        .timeout_params
745                        .hook_timeout_operator_sig_collection_operator(idx)
746                        .await;
747                    let stream = operator_client
748                        .deposit_sign(tonic::Request::new(sign_session))
749                        .await.wrap_err_with(|| AggregatorError::RequestFailed {
750                            request_name: format!("Deposit sign stream for operator {idx}"),
751                        })?;
752                    Ok::<_, BridgeError>(stream.into_inner())
753                }
754            }))
755                .await?;
756
757        let deposit_data: DepositData = deposit_sign_session
758            .deposit_params
759            .clone()
760            .ok_or_else(|| eyre::eyre!("No deposit params found in deposit sign session"))?
761            .try_into()
762            .wrap_err("Failed to convert deposit params to deposit data")?;
763
764        // calculate number of signatures needed from each operator
765        let needed_sigs = config.get_num_required_operator_sigs(&deposit_data);
766
767        // get signatures from each operator's signature streams
768        let operator_sigs =
769            try_join_all_combine_errors(operator_sigs_streams.iter_mut().enumerate().map(
770                |(idx, stream)| async move {
771                    let mut sigs: Vec<Signature> = Vec::with_capacity(needed_sigs);
772                    while let Some(sig) =
773                        stream
774                            .message()
775                            .await
776                            .wrap_err_with(|| AggregatorError::RequestFailed {
777                                request_name: format!("Deposit sign stream for operator {idx}"),
778                            })?
779                    {
780                        sigs.push(Signature::from_slice(&sig.schnorr_sig).wrap_err_with(|| {
781                            format!("Failed to parse Schnorr signature from operator {idx}")
782                        })?);
783                        if sigs.len() == needed_sigs {
784                            break;
785                        }
786                    }
787                    Ok::<_, BridgeError>(sigs)
788                },
789            ))
790            .await
791            .wrap_err("Failed to get operator signatures from operators")?;
792
793        // check if all signatures are received
794        for (idx, sigs) in operator_sigs.iter().enumerate() {
795            if sigs.len() != needed_sigs {
796                return Err(eyre::eyre!(
797                    "Not all operator sigs received from operator {}.\n Expected: {}, got: {}",
798                    idx,
799                    needed_sigs,
800                    sigs.len()
801                )
802                .into());
803            }
804        }
805        Ok(operator_sigs)
806    }
807
808    async fn create_movetx(
809        &self,
810        partial_sigs: Vec<Vec<u8>>,
811        movetx_agg_and_pub_nonces: (AggregatedNonce, Vec<PublicNonce>),
812        deposit_params: DepositParams,
813    ) -> Result<TxHandler<Signed>, Status> {
814        let mut deposit_data: DepositData = deposit_params.try_into()?;
815        let musig_partial_sigs = parser::verifier::parse_partial_sigs(partial_sigs)?;
816
817        // create move tx and calculate sighash
818        let mut move_txhandler =
819            create_move_to_vault_txhandler(&mut deposit_data, self.config.protocol_paramset())?;
820
821        let sighash = move_txhandler.calculate_script_spend_sighash_indexed(
822            0,
823            0,
824            bitcoin::TapSighashType::Default,
825        )?;
826
827        let musig_sigs_and_nonces = musig_partial_sigs
828            .into_iter()
829            .zip(movetx_agg_and_pub_nonces.1)
830            .collect::<Vec<_>>();
831
832        // aggregate partial signatures
833        let verifiers_public_keys = deposit_data.get_verifiers();
834        let final_sig = crate::musig2::aggregate_partial_signatures(
835            verifiers_public_keys,
836            None,
837            movetx_agg_and_pub_nonces.0,
838            &musig_sigs_and_nonces,
839            Message::from_digest(sighash.to_byte_array()),
840        )?;
841
842        // Put the signature in the tx
843        move_txhandler.set_p2tr_script_spend_witness(&[final_sig.as_ref()], 0, 0)?;
844
845        Ok(move_txhandler.promote()?)
846    }
847
848    async fn verify_and_save_emergency_stop_sigs(
849        &self,
850        emergency_stop_sigs: Vec<Vec<u8>>,
851        emergency_stop_agg_and_pub_nonces: (AggregatedNonce, Vec<PublicNonce>),
852        deposit_params: DepositParams,
853    ) -> Result<(), BridgeError> {
854        let mut deposit_data: DepositData = deposit_params
855            .try_into()
856            .wrap_err("Failed to convert deposit params to deposit data")?;
857        let musig_partial_sigs = parser::verifier::parse_partial_sigs(emergency_stop_sigs)
858            .wrap_err("Failed to parse emergency stop signatures")?;
859
860        // create move tx and calculate sighash
861        let move_txhandler =
862            create_move_to_vault_txhandler(&mut deposit_data, self.config.protocol_paramset())?;
863
864        let mut emergency_stop_txhandler = create_emergency_stop_txhandler(
865            &mut deposit_data,
866            &move_txhandler,
867            self.config.protocol_paramset(),
868        )?;
869
870        let sighash = emergency_stop_txhandler.calculate_script_spend_sighash_indexed(
871            0,
872            0,
873            bitcoin::TapSighashType::SinglePlusAnyoneCanPay,
874        )?;
875
876        let verifiers_public_keys = deposit_data.get_verifiers();
877
878        let musig_sigs_and_nonces = musig_partial_sigs
879            .into_iter()
880            .zip(emergency_stop_agg_and_pub_nonces.1)
881            .collect::<Vec<_>>();
882
883        let final_sig = crate::musig2::aggregate_partial_signatures(
884            verifiers_public_keys,
885            None,
886            emergency_stop_agg_and_pub_nonces.0,
887            &musig_sigs_and_nonces,
888            Message::from_digest(sighash.to_byte_array()),
889        )
890        .wrap_err("Failed to aggregate emergency stop signatures")?;
891
892        let final_sig = bitcoin::taproot::Signature {
893            signature: final_sig,
894            sighash_type: bitcoin::TapSighashType::SinglePlusAnyoneCanPay,
895        };
896
897        // insert the signature into the tx
898        emergency_stop_txhandler.set_p2tr_script_spend_witness(&[final_sig.serialize()], 0, 0)?;
899
900        let emergency_stop_tx = emergency_stop_txhandler.get_cached_tx();
901        let move_to_vault_txid = move_txhandler.get_txid();
902
903        tracing::debug!("Move to vault tx id: {}", move_to_vault_txid.to_string());
904
905        let emergency_stop_pubkey = self
906            .config
907            .emergency_stop_encryption_public_key
908            .ok_or_else(|| eyre::eyre!("Emergency stop encryption public key is not set"))?;
909        let encrypted_emergency_stop_tx = crate::encryption::encrypt_bytes(
910            emergency_stop_pubkey,
911            &bitcoin::consensus::serialize(&emergency_stop_tx),
912        )?;
913
914        self.db
915            .insert_signed_emergency_stop_tx_if_not_exists(
916                None,
917                move_to_vault_txid,
918                &encrypted_emergency_stop_tx,
919            )
920            .await?;
921
922        Ok(())
923    }
924
925    #[cfg(feature = "automation")]
926    pub async fn send_emergency_stop_tx(
927        &self,
928        tx: bitcoin::Transaction,
929    ) -> Result<bitcoin::Transaction, Status> {
930        // Add fee bumper.
931        let mut dbtx = self.db.begin_transaction().await?;
932        self.tx_sender
933            .insert_try_to_send(
934                Some(&mut dbtx),
935                Some(TxMetadata {
936                    deposit_outpoint: None,
937                    operator_xonly_pk: None,
938                    round_idx: None,
939                    kickoff_idx: None,
940                    tx_type: TransactionType::EmergencyStop,
941                }),
942                &tx,
943                FeePayingType::RBF,
944                None,
945                &[],
946                &[],
947                &[],
948                &[],
949            )
950            .await?;
951        dbtx.commit()
952            .await
953            .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
954
955        Ok(tx)
956    }
957}
958
959#[async_trait]
960impl ClementineAggregator for AggregatorServer {
961    async fn get_compatibility_params(
962        &self,
963        _request: Request<Empty>,
964    ) -> Result<Response<CompatibilityParamsRpc>, Status> {
965        let params = self.aggregator.get_compatibility_params()?;
966        Ok(Response::new(params.try_into().map_to_status()?))
967    }
968
969    async fn get_compatibility_data_from_entities(
970        &self,
971        _request: Request<Empty>,
972    ) -> Result<Response<EntitiesCompatibilityData>, Status> {
973        let data = self
974            .aggregator
975            .get_compatibility_data_from_entities()
976            .await?;
977        Ok(Response::new(EntitiesCompatibilityData {
978            entities_compatibility_data: data,
979        }))
980    }
981
982    async fn vergen(&self, _request: Request<Empty>) -> Result<Response<VergenResponse>, Status> {
983        tracing::info!("Vergen rpc called");
984        Ok(Response::new(get_vergen_response()))
985    }
986
987    async fn get_entity_statuses(
988        &self,
989        request: Request<GetEntityStatusesRequest>,
990    ) -> Result<Response<EntityStatuses>, Status> {
991        tracing::info!("Get entity statuses rpc called");
992        let request = request.into_inner();
993        let restart_tasks = request.restart_tasks;
994
995        Ok(Response::new(EntityStatuses {
996            entity_statuses: self.aggregator.get_entity_statuses(restart_tasks).await?,
997        }))
998    }
999
1000    async fn optimistic_payout(
1001        &self,
1002        request: tonic::Request<super::OptimisticWithdrawParams>,
1003    ) -> std::result::Result<tonic::Response<super::RawSignedTx>, tonic::Status> {
1004        tracing::info!("Optimistic payout rpc called");
1005        let opt_withdraw_params = request.into_inner();
1006
1007        let withdraw_params =
1008            opt_withdraw_params
1009                .withdrawal
1010                .clone()
1011                .ok_or(Status::invalid_argument(
1012                    "Withdrawal params not found for optimistic payout",
1013                ))?;
1014        let (deposit_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
1015            parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
1016        tracing::info!("Parsed optimistic payout rpc params, deposit id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}, verification signature: {:?}", deposit_id, input_signature, input_outpoint, output_script_pubkey, output_amount, opt_withdraw_params.verification_signature);
1017
1018        // check compatibility with verifiers only
1019        self.check_compatibility_with_actors(CompatibilityCheckScope::VerifiersOnly)
1020            .await?;
1021
1022        // if the withdrawal utxo is spent, no reason to sign optimistic payout
1023        if self
1024            .rpc
1025            .is_utxo_spent(&input_outpoint)
1026            .await
1027            .map_to_status()?
1028        {
1029            return Err(Status::invalid_argument(format!(
1030                "Withdrawal utxo is already spent: {input_outpoint:?}",
1031            )));
1032        }
1033
1034        // check for some standard script pubkeys
1035        if !(output_script_pubkey.is_p2tr()
1036            || output_script_pubkey.is_p2pkh()
1037            || output_script_pubkey.is_p2sh()
1038            || output_script_pubkey.is_p2wpkh()
1039            || output_script_pubkey.is_p2wsh())
1040        {
1041            return Err(Status::invalid_argument(format!(
1042                "Output script pubkey is not a valid script pubkey: {output_script_pubkey}, must be p2tr, p2pkh, p2sh, p2wpkh, or p2wsh"
1043            )));
1044        }
1045
1046        // get which deposit the withdrawal belongs to
1047        let withdrawal = self
1048            .db
1049            .get_move_to_vault_txid_from_citrea_deposit(None, deposit_id)
1050            .await?;
1051        if let Some(move_txid) = withdrawal {
1052            // check if withdrawal utxo is correct
1053            let withdrawal_utxo = self
1054                .db
1055                .get_withdrawal_utxo_from_citrea_withdrawal(None, deposit_id)
1056                .await?;
1057            if withdrawal_utxo != input_outpoint {
1058                return Err(Status::invalid_argument(format!(
1059                    "Withdrawal utxo is not correct: {withdrawal_utxo:?} != {input_outpoint:?}",
1060                )));
1061            }
1062
1063            // Prepare input and output of the payout transaction.
1064            let withdrawal_prevout = self
1065                .rpc
1066                .get_txout_from_outpoint(&input_outpoint)
1067                .await
1068                .map_to_status()?;
1069
1070            let user_xonly_pk = withdrawal_prevout
1071                .script_pubkey
1072                .try_get_taproot_pk()
1073                .map_err(|_| {
1074                    Status::invalid_argument(format!(
1075                        "Withdrawal prevout script_pubkey is not a Taproot output: {:?}",
1076                        withdrawal_prevout.script_pubkey
1077                    ))
1078                })?;
1079
1080            let withdrawal_utxo = UTXO {
1081                outpoint: input_outpoint,
1082                txout: withdrawal_prevout,
1083            };
1084
1085            let output_txout = TxOut {
1086                value: output_amount,
1087                script_pubkey: output_script_pubkey,
1088            };
1089
1090            let deposit_data = self
1091                .db
1092                .get_deposit_data_with_move_tx(None, move_txid)
1093                .await?;
1094
1095            let mut deposit_data = deposit_data
1096                .ok_or(eyre::eyre!(
1097                    "Deposit data not found for move txid {}",
1098                    move_txid
1099                ))
1100                .map_err(BridgeError::from)?;
1101
1102            let mut opt_payout_txhandler = create_optimistic_payout_txhandler(
1103                &mut deposit_data,
1104                withdrawal_utxo,
1105                output_txout,
1106                input_signature,
1107                self.config.protocol_paramset(),
1108            )?;
1109
1110            let sighash = opt_payout_txhandler
1111                .calculate_pubkey_spend_sighash(0, input_signature.sighash_type)?;
1112
1113            let message = Message::from_digest(sighash.to_byte_array());
1114
1115            SECP.verify_schnorr(&input_signature.signature, &message, &user_xonly_pk)
1116                .map_err(|_| Status::internal("Invalid signature for optimistic payout tx. Ensure the signature uses SinglePlusAnyoneCanPay sighash type."))?;
1117
1118            // get which verifiers participated in the deposit to collect the optimistic payout tx signature
1119            let participating_verifiers = self.get_participating_verifiers(&deposit_data).await?;
1120            let verifiers_ids = participating_verifiers.ids();
1121            let (first_responses, mut nonce_streams) = {
1122                create_nonce_streams(
1123                    participating_verifiers.clone(),
1124                    1,
1125                    #[cfg(test)]
1126                    &self.config,
1127                )
1128                .await?
1129            };
1130            // collect nonces
1131            let pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
1132                .await
1133                .wrap_err("Failed to aggregate nonces for optimistic payout")
1134                .map_to_status()?;
1135            let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::<Vec<_>>().as_slice())?;
1136
1137            let agg_nonce_bytes = agg_nonce.serialize().to_vec();
1138            // send the agg nonce to the verifiers to sign the optimistic payout tx
1139            let opt_payout_sign_futures = participating_verifiers
1140                .clients()
1141                .iter()
1142                .zip(first_responses)
1143                .map(|(client, first_response)| {
1144                    let mut client = client.clone();
1145                    let opt_withdraw_params = opt_withdraw_params.clone();
1146                    {
1147                        let agg_nonce_serialized = agg_nonce_bytes.clone();
1148                        async move {
1149                            let mut request = Request::new(OptimisticPayoutParams {
1150                                opt_withdrawal: Some(opt_withdraw_params),
1151                                agg_nonce: agg_nonce_serialized,
1152                                nonce_gen: Some(first_response),
1153                            });
1154                            request.set_timeout(OPTIMISTIC_PAYOUT_TIMEOUT);
1155                            client.optimistic_payout_sign(request).await
1156                        }
1157                    }
1158                })
1159                .collect::<Vec<_>>();
1160
1161            // get signatures and check for any errors
1162            let opt_payout_resps = join_all(opt_payout_sign_futures).await;
1163            let mut payout_sigs = Vec::new();
1164            let mut errors = Vec::new();
1165            for (resp, verifier_id) in opt_payout_resps
1166                .into_iter()
1167                .zip(participating_verifiers.ids())
1168            {
1169                match resp {
1170                    Ok(res) => {
1171                        payout_sigs.push(res.into_inner());
1172                    }
1173                    Err(e) => {
1174                        errors.push(format!("{verifier_id} optimistic payout sign failed: {e}"));
1175                    }
1176                }
1177            }
1178            if !errors.is_empty() {
1179                return Err(eyre::eyre!("{errors:?}").into_status());
1180            }
1181
1182            // calculate final sig
1183            // txin at index 1 is deposited utxo in movetx
1184            let sighash = opt_payout_txhandler.calculate_script_spend_sighash_indexed(
1185                1,
1186                0,
1187                bitcoin::TapSighashType::Default,
1188            )?;
1189
1190            let musig_partial_sigs = payout_sigs
1191                .into_iter()
1192                .map(|sig| {
1193                    PartialSignature::from_byte_array(
1194                        &sig.partial_sig
1195                            .try_into()
1196                            .map_err(|_| secp256k1::musig::ParseError::MalformedArg)?,
1197                    )
1198                })
1199                .collect::<Result<Vec<_>, _>>()
1200                .map_err(|e| Status::internal(format!("Failed to parse partial sig: {e:?}")))?;
1201
1202            let musig_sigs_and_nonces = musig_partial_sigs
1203                .into_iter()
1204                .zip(pub_nonces)
1205                .collect::<Vec<_>>();
1206
1207            let final_sig = bitcoin::taproot::Signature {
1208                signature: crate::musig2::aggregate_partial_signatures(
1209                    deposit_data.get_verifiers(),
1210                    None,
1211                    agg_nonce,
1212                    &musig_sigs_and_nonces,
1213                    Message::from_digest(sighash.to_byte_array()),
1214                )?,
1215                sighash_type: bitcoin::TapSighashType::Default,
1216            };
1217
1218            // set witness and send tx
1219            opt_payout_txhandler.set_p2tr_script_spend_witness(&[final_sig.serialize()], 1, 0)?;
1220            let opt_payout_txhandler = opt_payout_txhandler.promote()?;
1221            let opt_payout_tx = opt_payout_txhandler.get_cached_tx();
1222            tracing::info!(
1223                "Optimistic payout transaction created successfully for deposit id: {:?}",
1224                deposit_id
1225            );
1226
1227            #[cfg(feature = "automation")]
1228            {
1229                tracing::info!("Sending optimistic payout tx via tx_sender");
1230
1231                let mut dbtx = self.db.begin_transaction().await?;
1232                self.tx_sender
1233                    .add_tx_to_queue(
1234                        Some(&mut dbtx),
1235                        TransactionType::OptimisticPayout,
1236                        opt_payout_tx,
1237                        &[],
1238                        None,
1239                        self.config.protocol_paramset(),
1240                        None,
1241                    )
1242                    .await
1243                    .map_to_status()?;
1244                dbtx.commit().await.map_err(|e| {
1245                    Status::internal(format!(
1246                        "Failed to commit db transaction to send optimistic payout tx: {e}",
1247                    ))
1248                })?;
1249            }
1250
1251            Ok(Response::new(RawSignedTx::from(opt_payout_tx)))
1252        } else {
1253            Err(Status::not_found(format!(
1254                "Withdrawal with index {deposit_id} not found."
1255            )))
1256        }
1257    }
1258
1259    async fn internal_send_tx(
1260        &self,
1261        request: Request<clementine::SendTxRequest>,
1262    ) -> Result<Response<Empty>, Status> {
1263        #[cfg(not(feature = "automation"))]
1264        {
1265            Err(Status::unimplemented("Automation is not enabled"))
1266        }
1267        #[cfg(feature = "automation")]
1268        {
1269            let send_tx_req = request.into_inner();
1270            let fee_type = send_tx_req.fee_type();
1271            let signed_tx: bitcoin::Transaction = send_tx_req
1272                .raw_tx
1273                .ok_or(Status::invalid_argument("Missing raw_tx"))?
1274                .try_into()?;
1275            tracing::warn!(
1276                "Internal send tx rpc called with feetype: {:?}, tx hex: {}",
1277                fee_type,
1278                bitcoin::consensus::encode::serialize_hex(&signed_tx)
1279            );
1280
1281            let mut dbtx = self.db.begin_transaction().await?;
1282            self.tx_sender
1283                .insert_try_to_send(
1284                    Some(&mut dbtx),
1285                    None,
1286                    &signed_tx,
1287                    fee_type.try_into()?,
1288                    None,
1289                    &[],
1290                    &[],
1291                    &[],
1292                    &[],
1293                )
1294                .await
1295                .map_to_status()?;
1296            dbtx.commit()
1297                .await
1298                .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
1299            Ok(Response::new(Empty {}))
1300        }
1301    }
1302
1303    #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
1304    async fn setup(
1305        &self,
1306        _request: Request<Empty>,
1307    ) -> Result<Response<VerifierPublicKeys>, Status> {
1308        tracing::info!("Setup rpc called");
1309        self.check_compatibility_with_actors(CompatibilityCheckScope::Both)
1310            .await?;
1311        // Propagate Operators configurations to all verifier clients
1312        const CHANNEL_CAPACITY: usize = 1024 * 16;
1313        let (operator_params_tx, operator_params_rx) =
1314            tokio::sync::broadcast::channel(CHANNEL_CAPACITY);
1315        let operator_params_rx_handles = (0..self.get_verifier_clients().len())
1316            .map(|_| operator_params_rx.resubscribe())
1317            .collect::<Vec<_>>();
1318
1319        let operators = self.get_operator_clients().to_vec();
1320        let operator_pks = self.fetch_operator_keys().await?;
1321        let operator_ids = operator_pks
1322            .iter()
1323            .map(|key| OperatorId(*key))
1324            .collect::<Vec<_>>();
1325        let get_operator_params_chunked_handle = tokio::spawn(async move {
1326            tracing::info!(clients = operators.len(), "Collecting operator details...");
1327            try_join_all_combine_errors(operators.iter().zip(operator_ids.iter()).map(
1328                |(operator, id)| {
1329                    let mut operator = operator.clone();
1330                    let tx = operator_params_tx.clone();
1331                    async move {
1332                        let stream = operator
1333                            .get_params(Request::new(Empty {}))
1334                            .await
1335                            .wrap_err_with(|| AggregatorError::RequestFailed {
1336                                request_name: format!("Operator get params for {id}"),
1337                            })
1338                            .map_err(BridgeError::from)?
1339                            .into_inner();
1340                        tx.send(stream.try_collect::<Vec<_>>().await?)
1341                            .map_err(|e| {
1342                                BridgeError::from(eyre::eyre!(
1343                                    "Failed to read operator params for {id}: {e}"
1344                                ))
1345                            })?;
1346                        Ok::<_, Status>(())
1347                    }
1348                },
1349            ))
1350            .await
1351            .wrap_err("Failed to get operator params from operators")
1352            .map_to_status()?;
1353            Ok::<_, Status>(())
1354        });
1355
1356        let verifiers = self.get_verifier_clients().to_vec();
1357        let verifier_pks = self.fetch_verifier_keys().await?;
1358        let verifier_ids = verifier_pks
1359            .iter()
1360            .map(|key| VerifierId(*key))
1361            .collect::<Vec<_>>();
1362        let set_operator_params_handle = tokio::spawn(async move {
1363            tracing::info!("Informing verifiers of existing operators...");
1364            try_join_all_combine_errors(
1365                verifiers
1366                    .iter()
1367                    .zip(verifier_ids.iter())
1368                    .zip(operator_params_rx_handles)
1369                    .map(|((verifier, id), mut rx)| {
1370                        let verifier = verifier.clone();
1371                        async move {
1372                            collect_and_call(&mut rx, |params| {
1373                                let mut verifier = verifier.clone();
1374                                async move {
1375                                    verifier
1376                                        .set_operator(futures::stream::iter(params))
1377                                        .await
1378                                        .wrap_err_with(|| AggregatorError::RequestFailed {
1379                                            request_name: format!("Verifier set_operator for {id}"),
1380                                        })
1381                                        .map_err(BridgeError::from)?;
1382                                    Ok::<_, Status>(())
1383                                }
1384                            })
1385                            .await?;
1386                            Ok::<_, Status>(())
1387                        }
1388                    }),
1389            )
1390            .await
1391            .wrap_err("Failed to set_operator for all verifiers")
1392            .map_to_status()?;
1393            Ok::<_, Status>(())
1394        });
1395
1396        let task_outputs = timed_request(
1397            SETUP_COMPLETION_TIMEOUT,
1398            "Aggregator setup pipeline",
1399            async move {
1400                Ok::<_, BridgeError>(
1401                    futures::future::join_all([
1402                        get_operator_params_chunked_handle,
1403                        set_operator_params_handle,
1404                    ])
1405                    .await,
1406                )
1407            },
1408        )
1409        .await?;
1410
1411        let task_names = ["Get operator params", "Set operator params"];
1412        debug_assert_eq!(task_names.len(), task_outputs.len());
1413
1414        flatten_join_named_results(task_names.into_iter().zip(task_outputs.into_iter()))?;
1415
1416        Ok(Response::new(VerifierPublicKeys::from(verifier_pks)))
1417    }
1418
1419    /// Handles a new deposit request from a user. This function coordinates the signing process
1420    /// between verifiers to create a valid move transaction. It ensures a covenant using pre-signed NofN transactions.
1421    /// It also collects signatures from operators to ensure that the operators can be slashed if they act maliciously.
1422    ///
1423    /// Overview:
1424    /// 1. Receive and parse deposit parameters from user
1425    /// 2. Signs all NofN transactions with verifiers using MuSig2:
1426    ///    - Creates nonce streams with verifiers (get pub nonces for each transaction)
1427    ///    - Opens deposit signing streams with verifiers (sends aggnonces for each transaction, receives partial sigs)
1428    ///    - Opens deposit finalization streams with verifiers (sends final signatures, receives movetx signatures)
1429    /// 3. Collects signatures from operators
1430    /// 4. Waits for all tasks to complete
1431    /// 5. Returns signed move transaction
1432    ///
1433    /// The following pipelines are used to coordinate the signing process, these move the data between the verifiers and the aggregator:
1434    ///    - Nonce aggregation
1435    ///    - Nonce distribution
1436    ///    - Signature aggregation
1437    ///    - Signature distribution
1438    // #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
1439    async fn new_deposit(
1440        &self,
1441        request: Request<Deposit>,
1442    ) -> Result<Response<clementine::RawSignedTx>, Status> {
1443        tracing::info!("New deposit rpc called");
1444        self.check_compatibility_with_actors(CompatibilityCheckScope::Both)
1445            .await?;
1446
1447        timed_request(OVERALL_DEPOSIT_TIMEOUT, "Overall new deposit", async {
1448            let deposit_info: DepositInfo = request.into_inner().try_into()?;
1449            tracing::info!(
1450                "Parsed new deposit rpc params, deposit info: {:?}",
1451                deposit_info
1452            );
1453
1454            let deposit_data = DepositData {
1455                deposit: deposit_info.clone(),
1456                nofn_xonly_pk: None,
1457                actors: Actors {
1458                    verifiers: self.fetch_verifier_keys().await?,
1459                    watchtowers: vec![],
1460                    operators: self.fetch_operator_keys().await?,
1461                },
1462                security_council: self.config.security_council.clone(),
1463            };
1464            tracing::info!(
1465                "Created deposit data in new_deposit for deposit info: {:?}, deposit data: {:?}",
1466                deposit_info,
1467                deposit_data
1468            );
1469
1470            let deposit_params = deposit_data.clone().into();
1471
1472            // Collect and distribute keys needed keys from operators and watchtowers to verifiers
1473            let start = std::time::Instant::now();
1474            timed_request(
1475                KEY_DISTRIBUTION_TIMEOUT,
1476                "Key collection and distribution",
1477                self.collect_and_distribute_keys(&deposit_params),
1478            )
1479            .await?;
1480            tracing::info!("Collected and distributed keys in {:?}", start.elapsed());
1481
1482            let verifiers = self.get_participating_verifiers(&deposit_data).await?;
1483            let verifiers_ids = verifiers.ids();
1484
1485            // Generate nonce streams for all verifiers.
1486            let num_required_sigs = self.config.get_num_required_nofn_sigs(&deposit_data);
1487            let num_required_nonces = num_required_sigs as u32 + 2; // ask for +2 for the final movetx signature + emergency stop signature, but don't send it on deposit_sign stage
1488            let (first_responses, nonce_streams) =
1489                    create_nonce_streams(
1490                        verifiers.clone(),
1491                        num_required_nonces,
1492                        #[cfg(test)]
1493                        &self.config,
1494                    )
1495                    .await?;
1496
1497            // Create initial deposit session and send to verifiers
1498            let deposit_sign_session = DepositSignSession {
1499                deposit_params: Some(deposit_params.clone()),
1500                nonce_gen_first_responses: first_responses,
1501            };
1502
1503            let deposit_sign_param: VerifierDepositSignParams =
1504                    deposit_sign_session.clone().into();
1505
1506        #[allow(clippy::unused_enumerate_index)]
1507            let partial_sig_streams = timed_try_join_all(
1508                PARTIAL_SIG_STREAM_CREATION_TIMEOUT,
1509                "Partial signature stream creation",
1510                Some(verifiers.ids()),
1511                verifiers.clients().into_iter().enumerate().map(|(_idx, verifier_client)| {
1512                    let mut verifier_client = verifier_client.clone();
1513                    #[cfg(test)]
1514                    let config = self.config.clone();
1515
1516                    let deposit_sign_param =
1517                    deposit_sign_param.clone();
1518
1519                    async move {
1520                        #[cfg(test)]
1521                        config
1522                            .test_params
1523                            .timeout_params
1524                            .hook_timeout_partial_sig_stream_creation_verifier(_idx)
1525                            .await;
1526
1527                        let (tx, rx) = tokio::sync::mpsc::channel(num_required_nonces as usize + 1); // initial param + num_required_nonces nonces
1528
1529                        let stream = verifier_client
1530                            .deposit_sign(tokio_stream::wrappers::ReceiverStream::new(rx))
1531                            .await?
1532                            .into_inner();
1533
1534                        tx.send(deposit_sign_param).await.map_err(|e| {
1535                            BridgeError::from(eyre::eyre!("Failed to send deposit sign session: {e:?}"))})?;
1536
1537                        Ok::<_, BridgeError>((stream, tx))
1538                    }
1539                })
1540            )
1541            .await?;
1542
1543            // Set up deposit finalization streams
1544        #[allow(clippy::unused_enumerate_index)]
1545            let deposit_finalize_streams = verifiers.clients().into_iter().enumerate().map(
1546                    |(_idx, mut verifier)| {
1547                        let (tx, rx) = tokio::sync::mpsc::channel(num_required_nonces as usize + 1);
1548                        let receiver_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1549                        #[cfg(test)]
1550                        let config = self.config.clone();
1551                        // start deposit_finalize with tokio spawn
1552                        let deposit_finalize_future = tokio::spawn(async move {
1553                            #[cfg(test)]
1554                            config
1555                                .test_params
1556                                .timeout_params
1557                                .hook_timeout_deposit_finalize_verifier(_idx)
1558                                .await;
1559
1560                            verifier.deposit_finalize(receiver_stream).await
1561                        });
1562
1563                        Ok::<_, BridgeError>((deposit_finalize_future, tx))
1564                    },
1565                ).collect::<Result<Vec<_>, BridgeError>>()?;
1566
1567            tracing::info!("Sending deposit finalize streams to verifiers for deposit {:?}", deposit_info);
1568
1569            let (deposit_finalize_futures, deposit_finalize_sender): (Vec<_>, Vec<_>) =
1570                deposit_finalize_streams.into_iter().unzip();
1571
1572            // Send initial finalization params
1573            let deposit_finalize_first_param: VerifierDepositFinalizeParams =
1574                deposit_sign_session.clone().into();
1575
1576            timed_try_join_all(
1577                DEPOSIT_FINALIZE_STREAM_CREATION_TIMEOUT,
1578                "Deposit finalization initial param send",
1579                Some(verifiers.ids()),
1580                deposit_finalize_sender.iter().cloned().map(|tx| {
1581                    let param = deposit_finalize_first_param.clone();
1582                    async move {
1583                        tx.send(param).await
1584                        .map_err(|e| {
1585                            BridgeError::from(eyre::eyre!(
1586                                "Failed to send deposit finalize first param: {e:?}"))
1587                        })
1588                    }
1589                })
1590            ).await?;
1591
1592
1593            let deposit_blockhash = self
1594                .rpc
1595                .get_blockhash_of_tx(&deposit_data.get_deposit_outpoint().txid)
1596                .await
1597                .map_to_status()?;
1598
1599            let verifiers_public_keys = deposit_data.get_verifiers();
1600
1601            let needed_nofn_sigs = self.config.get_num_required_nofn_sigs(&deposit_data);
1602
1603            // Create sighash stream for transaction signing
1604            let sighash_stream = Box::pin(create_nofn_sighash_stream(
1605                self.db.clone(),
1606                self.config.clone(),
1607                deposit_data.clone(),
1608                deposit_blockhash,
1609                false,
1610            ));
1611
1612            // Create channels for pipeline communication
1613            let (agg_nonce_sender, agg_nonce_receiver) = channel(num_required_nonces as usize);
1614            let (partial_sig_sender, partial_sig_receiver) = channel(num_required_nonces as usize);
1615            let (final_sig_sender, final_sig_receiver) = channel(num_required_nonces as usize);
1616
1617            // Start the nonce aggregation pipe.
1618            let nonce_agg_handle = tokio::spawn(nonce_aggregator(
1619                nonce_streams,
1620                sighash_stream,
1621                agg_nonce_sender,
1622                needed_nofn_sigs,
1623                verifiers_ids.clone(),
1624            ));
1625
1626            // Start the nonce distribution pipe.
1627            let nonce_dist_handle = tokio::spawn(nonce_distributor(
1628                agg_nonce_receiver,
1629                partial_sig_streams,
1630                partial_sig_sender,
1631                needed_nofn_sigs,
1632                verifiers_ids.clone(),
1633            ));
1634
1635            // Start the signature aggregation pipe.
1636            let sig_agg_handle = tokio::spawn(signature_aggregator(
1637                partial_sig_receiver,
1638                verifiers_public_keys,
1639                final_sig_sender,
1640                needed_nofn_sigs,
1641            ));
1642
1643            tracing::debug!("Getting signatures from operators");
1644            // Get sigs from each operator in background
1645            let operators = self.get_participating_operators(&deposit_data).await?;
1646
1647            let config_clone = self.config.clone();
1648            let operator_sigs_fut = tokio::spawn(async move {
1649                timed_request(
1650                    OPERATOR_SIGS_TIMEOUT,
1651                    "Operator signature collection",
1652                    async {
1653                        Aggregator::collect_operator_sigs(
1654                            operators,
1655                            config_clone,
1656                            deposit_sign_session,
1657                        )
1658                        .await
1659                    },
1660                )
1661                .await
1662            });
1663
1664            // Join the nonce aggregation handle to get the movetx agg nonce.
1665            let nonce_agg_handle = nonce_agg_handle
1666                .map_err(|_| Status::internal("panic when aggregating nonces"))
1667                .map(
1668                    |res| -> Result<((AggregatedNonce, Vec<PublicNonce>), (AggregatedNonce, Vec<PublicNonce>)), Status> {
1669                        res.and_then(|r| r.map_err(Into::into))
1670                    },
1671                )
1672                .shared();
1673
1674            // Start the deposit finalization pipe.
1675            let sig_dist_handle = tokio::spawn(signature_distributor(
1676                final_sig_receiver,
1677                deposit_finalize_sender.clone(),
1678                nonce_agg_handle.clone(),
1679                needed_nofn_sigs,
1680                verifiers_ids.clone(),
1681            ));
1682
1683            // Right now we collect all operator sigs then start to send them, we can do it simultaneously in the future
1684            // Need to change sig verification ordering in deposit_finalize() in verifiers so that we verify
1685            // 1st signature of all operators, then 2nd of all operators etc.
1686            let all_op_sigs = operator_sigs_fut
1687                .await
1688                .map_err(|_| BridgeError::from(eyre::eyre!("panic when collecting operator signatures")))??;
1689
1690            tracing::info!("Got all operator signatures for deposit {:?}", deposit_info);
1691
1692            // Wait for all pipeline tasks to complete
1693            // join_all should be enough here as if one fails other tasks should fail too as they are connected through streams
1694            // one should not hang if any other task fails, the others should finish
1695            // this is needed because try_join_all can potentially not return the error of the first task that failed, just the one it polled first
1696            // that returned an error
1697            let task_outputs =  timed_request(
1698                PIPELINE_COMPLETION_TIMEOUT,
1699                "MuSig2 signing pipeline",
1700                async move {
1701                    Ok::<_, BridgeError>(futures::future::join_all([nonce_dist_handle, sig_agg_handle, sig_dist_handle]).await)
1702                },
1703            )
1704            .await?;
1705
1706            let task_names = ["Nonce distribution", "Signature aggregation", "Signature distribution"];
1707
1708            debug_assert_eq!(task_names.len(), task_outputs.len());
1709
1710            flatten_join_named_results(
1711                task_names.into_iter().zip(task_outputs.into_iter()),
1712            )?;
1713            tracing::info!("All deposit_sign related tasks completed for deposit {:?}, now sending operator signatures to verifiers for verification", deposit_info);
1714
1715            tracing::debug!("Pipeline tasks completed");
1716            let verifiers_ids = verifiers.ids();
1717
1718            // send operators sigs to verifiers after all verifiers have signed
1719            let deposit_finalize_futures = timed_request(
1720                SEND_OPERATOR_SIGS_TIMEOUT,
1721                "Sending operator signatures to verifiers",
1722                async {
1723                    let send_operator_sigs: Vec<_> = deposit_finalize_sender
1724                        .iter()
1725                        .zip(verifiers_ids.iter())
1726                        .zip(deposit_finalize_futures.into_iter())
1727                        .map(|((tx, id), dep_fin_fut)| async {
1728                            for one_op_sigs in all_op_sigs.iter() {
1729                                for sig in one_op_sigs.iter() {
1730                                    let deposit_finalize_param: VerifierDepositFinalizeParams =
1731                                        sig.into();
1732
1733                                    let send = tx.send(deposit_finalize_param).await;
1734                                    match send {
1735                                        Ok(()) => (),
1736                                        Err(e) => {
1737                                            // check exact error by awaiting the future
1738                                            dep_fin_fut.await.wrap_err(format!("{} deposit finalize tokio task on aggregator returned error", id.clone()))?.wrap_err(format!("{} deposit finalize rpc call returned error", id.clone()))?;
1739                                            return Err(BridgeError::from(eyre::eyre!(format!("{} deposit finalize stream sending returned error: {:?}", id.clone(), e))));
1740                                        }
1741                                    }
1742                                }
1743                            }
1744
1745                            Ok::<_, BridgeError>(dep_fin_fut)
1746                        })
1747                        .collect();
1748                    try_join_all_combine_errors(send_operator_sigs).await
1749                },
1750            )
1751            .await.wrap_err("Failed to send operator signatures to verifiers")?;
1752
1753            tracing::info!("All operator signatures sent to verifiers for verification, now waiting to collect movetx and emergency stop tx partial signatures from verifiers for deposit {:?}", deposit_info);
1754
1755            // Collect partial signatures for move transaction
1756            let partial_sigs: Vec<(Vec<u8>, Vec<u8>)> = timed_try_join_all(
1757                    DEPOSIT_FINALIZATION_TIMEOUT,
1758                    "Deposit finalization",
1759                    Some(verifiers.ids()),
1760                    deposit_finalize_futures.into_iter().map(|fut| async move {
1761                        let inner = fut.await
1762                            .map_err(|_| BridgeError::from(eyre::eyre!("panic finishing deposit_finalize")))??
1763                            .into_inner();
1764                        Ok((inner.move_to_vault_partial_sig, inner.emergency_stop_partial_sig))
1765                    }),
1766                )
1767                .await?;
1768
1769
1770            let (move_to_vault_sigs, emergency_stop_sigs): (Vec<Vec<u8>>, Vec<Vec<u8>>) =
1771                partial_sigs.into_iter().unzip();
1772
1773            tracing::info!("Received move tx and emergency stop tx partial signatures for deposit {:?}", deposit_info);
1774
1775            // Create the final move transaction and check the signatures
1776            let (movetx_agg_nonce, emergency_stop_agg_nonce) = nonce_agg_handle.await?;
1777
1778            // Verify emergency stop signatures
1779            self.verify_and_save_emergency_stop_sigs(
1780                emergency_stop_sigs,
1781                emergency_stop_agg_nonce,
1782                deposit_params.clone(),
1783            )
1784            .await?;
1785
1786            let signed_movetx_handler = self
1787                .create_movetx(move_to_vault_sigs, movetx_agg_nonce, deposit_params)
1788                .await?;
1789
1790            let raw_signed_tx = RawSignedTx {
1791                raw_tx: bitcoin::consensus::serialize(&signed_movetx_handler.get_cached_tx()),
1792            };
1793
1794            tracing::info!("Created final move transaction for deposit {:?}", deposit_info);
1795
1796            Ok(Response::new(raw_signed_tx))
1797        })
1798        .await.map_err(Into::into)
1799    }
1800
1801    #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
1802    async fn withdraw(
1803        &self,
1804        request: Request<AggregatorWithdrawalInput>,
1805    ) -> Result<Response<AggregatorWithdrawResponse>, Status> {
1806        tracing::warn!("Withdraw rpc called");
1807        let request = request.into_inner();
1808        let (withdraw_params_with_sig, operator_xonly_pks) = (
1809            request.withdrawal.ok_or(Status::invalid_argument(
1810                "withdrawalParamsWithSig is missing",
1811            ))?,
1812            request.operator_xonly_pks,
1813        );
1814        // check compatibility with operators only
1815        self.check_compatibility_with_actors(CompatibilityCheckScope::OperatorsOnly)
1816            .await?;
1817
1818        let withdraw_params = withdraw_params_with_sig
1819            .clone()
1820            .withdrawal
1821            .ok_or(Status::invalid_argument("withdrawalParams is missing"))?;
1822
1823        // convert rpc xonly pks to bitcoin xonly pks
1824        let operator_xonly_pks_from_rpc: Vec<XOnlyPublicKey> = operator_xonly_pks
1825            .into_iter()
1826            .map(|xonly_pk| {
1827                xonly_pk.try_into().map_err(|e| {
1828                    Status::invalid_argument(format!("Failed to convert xonly public key: {e}"))
1829                })
1830            })
1831            .collect::<Result<Vec<_>, Status>>()?;
1832
1833        tracing::warn!(
1834            "Parsed withdraw rpc params, withdrawal params: {:?}, operator xonly pks: {:?}",
1835            withdraw_params,
1836            operator_xonly_pks_from_rpc
1837                .iter()
1838                .map(|pk| pk.to_string())
1839                .collect::<Vec<_>>()
1840        );
1841
1842        // parse_withdrawal_sig_params is called to check if the inputs can be parsed correctly
1843        // and check if input sighash type is SinglePlusAnyoneCanPay
1844        let (withdrawal_id, _, _, _, _) =
1845            parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
1846
1847        // check if all given operator xonly pubkeys are a valid operator xonly pubkey, to warn the caller if
1848        // something is wrong with the given operator xonly pubkeys
1849        let current_operator_xonly_pks = self.fetch_operator_keys().await?;
1850        let invalid_operator_xonly_pks = operator_xonly_pks_from_rpc
1851            .iter()
1852            .filter(|xonly_pk| !current_operator_xonly_pks.contains(xonly_pk))
1853            .collect::<Vec<_>>();
1854        if !invalid_operator_xonly_pks.is_empty() {
1855            return Err(Status::invalid_argument(format!(
1856                "Given xonly public key doesn't belong to any current operator: invalid keys: {invalid_operator_xonly_pks:?}, current operators: {current_operator_xonly_pks:?}"
1857            )));
1858        }
1859
1860        let operators = self
1861            .get_operator_clients()
1862            .iter()
1863            .zip(current_operator_xonly_pks.into_iter());
1864        let withdraw_futures = operators
1865            .filter(|(_, xonly_pk)| {
1866                // check if operator_xonly_pks is empty or contains the operator's xonly public key
1867                operator_xonly_pks_from_rpc.is_empty()
1868                    || operator_xonly_pks_from_rpc.contains(xonly_pk)
1869            })
1870            .map(|(operator, operator_xonly_pk)| {
1871                let mut operator = operator.clone();
1872                let params = withdraw_params_with_sig.clone();
1873                let mut request = Request::new(params);
1874                request.set_timeout(WITHDRAWAL_TIMEOUT);
1875                async move { (operator.withdraw(request).await, operator_xonly_pk) }
1876            });
1877
1878        // collect responses from operators and return them as a vector of strings
1879        let responses = futures::future::join_all(withdraw_futures).await;
1880        tracing::warn!(
1881            "Withdraw rpc completed successfully for withdrawal id: {}, operator xonly pks: {:?}, responses: {:?}",
1882            withdrawal_id,
1883            operator_xonly_pks_from_rpc
1884                .iter()
1885                .map(|pk| pk.to_string())
1886                .collect::<Vec<_>>(),
1887            responses,
1888        );
1889        Ok(Response::new(AggregatorWithdrawResponse {
1890            withdraw_responses: responses
1891                .into_iter()
1892                .map(|(res, xonly_pk)| match res {
1893                    Ok(withdraw_response) => OperatorWithrawalResponse {
1894                        operator_xonly_pk: Some(xonly_pk.into()),
1895                        response: Some(operator_withrawal_response::Response::RawTx(
1896                            withdraw_response.into_inner(),
1897                        )),
1898                    },
1899                    Err(e) => OperatorWithrawalResponse {
1900                        operator_xonly_pk: Some(xonly_pk.into()),
1901                        response: Some(operator_withrawal_response::Response::Error(e.to_string())),
1902                    },
1903                })
1904                .collect(),
1905        }))
1906    }
1907
1908    async fn get_nofn_aggregated_xonly_pk(
1909        &self,
1910        _: tonic::Request<super::Empty>,
1911    ) -> std::result::Result<tonic::Response<super::NofnResponse>, tonic::Status> {
1912        tracing::info!("Get nofn aggregated xonly pk rpc called");
1913        let verifier_keys = self.fetch_verifier_keys().await?;
1914        let num_verifiers = verifier_keys.len();
1915        let nofn_xonly_pk = bitcoin::XOnlyPublicKey::from_musig2_pks(verifier_keys.clone(), None)
1916            .map_err(|e| {
1917            Status::internal(format!(
1918                "Failed to aggregate verifier public keys, err: {e}, pubkeys: {verifier_keys:?}"
1919            ))
1920        })?;
1921        Ok(Response::new(super::NofnResponse {
1922            nofn_xonly_pk: nofn_xonly_pk.serialize().to_vec(),
1923            num_verifiers: num_verifiers as u32,
1924        }))
1925    }
1926
1927    async fn internal_get_emergency_stop_tx(
1928        &self,
1929        request: Request<clementine::GetEmergencyStopTxRequest>,
1930    ) -> Result<Response<clementine::GetEmergencyStopTxResponse>, Status> {
1931        tracing::warn!("Get emergency stop tx rpc called");
1932        let inner_request = request.into_inner();
1933        let txids: Vec<Txid> = inner_request
1934            .txids
1935            .into_iter()
1936            .map(|txid| {
1937                Txid::from_slice(&txid.txid).map_err(|e| {
1938                    tonic::Status::invalid_argument(format!("Failed to parse txid: {e}"))
1939                })
1940            })
1941            .collect::<Result<Vec<_>, _>>()?;
1942        tracing::warn!(
1943            "Parsed get emergency stop tx rpc params, move txids: {:?}",
1944            txids
1945                .iter()
1946                .map(|txid| txid.to_string())
1947                .collect::<Vec<_>>()
1948        );
1949
1950        let emergency_stop_txs = self.db.get_emergency_stop_txs(None, txids).await?;
1951
1952        let (txids, encrypted_emergency_stop_txs): (Vec<Txid>, Vec<Vec<u8>>) =
1953            emergency_stop_txs.into_iter().unzip();
1954
1955        Ok(Response::new(clementine::GetEmergencyStopTxResponse {
1956            txids: txids.into_iter().map(|txid| txid.into()).collect(),
1957            encrypted_emergency_stop_txs,
1958        }))
1959    }
1960
1961    async fn send_move_to_vault_tx(
1962        &self,
1963        request: Request<clementine::SendMoveTxRequest>,
1964    ) -> Result<Response<clementine::Txid>, Status> {
1965        tracing::info!("Send move to vault tx rpc called");
1966        #[cfg(not(feature = "automation"))]
1967        {
1968            let _ = request;
1969            return Err(Status::unimplemented(
1970                "Automation is disabled, cannot automatically send move to vault tx.",
1971            ));
1972        }
1973
1974        #[cfg(feature = "automation")]
1975        {
1976            use bitcoin::Amount;
1977            use std::sync::Arc;
1978
1979            use crate::builder::{
1980                address::create_taproot_address,
1981                script::{CheckSig, Multisig, SpendableScript},
1982                transaction::anchor_output,
1983            };
1984
1985            let request = request.into_inner();
1986            let movetx: bitcoin::Transaction = bitcoin::consensus::deserialize(
1987                &request
1988                    .raw_tx
1989                    .ok_or_eyre("raw_tx is required")
1990                    .map_to_status()?
1991                    .raw_tx,
1992            )
1993            .wrap_err("Failed to deserialize movetx")
1994            .map_to_status()?;
1995            let deposit_outpoint: bitcoin::OutPoint = request
1996                .deposit_outpoint
1997                .ok_or(Status::invalid_argument("deposit_outpoint is required"))?
1998                .try_into()?;
1999
2000            tracing::info!(
2001                "Parsed send move to vault tx rpc params, deposit outpoint: {:?}, movetx hex: {}",
2002                deposit_outpoint,
2003                bitcoin::consensus::encode::serialize_hex(&movetx)
2004            );
2005
2006            // check if transaction is a movetx
2007            if movetx.input.len() != 1 || movetx.output.len() != 2 {
2008                return Err(Status::invalid_argument(
2009                    "Transaction is not a movetx, input or output lengths are not correct",
2010                ));
2011            }
2012            // check output values
2013            // movetx always has 0 sat anchor output
2014            if !(movetx.output[0].value == self.config.protocol_paramset().bridge_amount
2015                && movetx.output[1].value == Amount::from_sat(0))
2016            {
2017                return Err(Status::invalid_argument(format!(
2018                    "Transaction is not a movetx, output sat values are not correct, should be ({}, 0), got ({}, {})",
2019                    self.config.protocol_paramset().bridge_amount,
2020                    movetx.output[0].value,
2021                    movetx.output[1].value,
2022                )));
2023            }
2024            // check output scriptpubkeys
2025            let verifier_keys = self.fetch_verifier_keys().await?;
2026            let nofn_xonly_pk =
2027                bitcoin::XOnlyPublicKey::from_musig2_pks(verifier_keys.clone(), None).map_err(
2028                    |e| {
2029                        Status::internal(format!(
2030                            "Failed to aggregate verifier public keys, err: {e}, pubkeys: {verifier_keys:?}"
2031                        ))
2032                    },
2033                )?;
2034            let nofn_script = Arc::new(CheckSig::new(nofn_xonly_pk));
2035            let security_council_script = Arc::new(Multisig::from_security_council(
2036                self.config.security_council.clone(),
2037            ));
2038
2039            let (addr, _) = create_taproot_address(
2040                &[
2041                    nofn_script.to_script_buf(),
2042                    security_council_script.to_script_buf(),
2043                ],
2044                None,
2045                self.config.protocol_paramset().network,
2046            );
2047            let bridge_script_pubkey = addr.script_pubkey();
2048
2049            if !(movetx.output[1].script_pubkey
2050                == anchor_output(self.config.protocol_paramset().anchor_amount()).script_pubkey
2051                && movetx.output[0].script_pubkey == bridge_script_pubkey)
2052            {
2053                return Err(Status::invalid_argument(
2054                    format!("Transaction is not a movetx, output scriptpubkeys are not correct, expected: (vault: {:?}, anchor: {:?}), got: (vault: {:?}, anchor: {:?})",
2055                    bridge_script_pubkey,
2056                    anchor_output(self.config.protocol_paramset().anchor_amount()).script_pubkey,
2057                    movetx.output[0].script_pubkey,
2058                    movetx.output[1].script_pubkey,
2059                )));
2060            }
2061
2062            let mut dbtx = self.db.begin_transaction().await?;
2063            self.tx_sender
2064                .insert_try_to_send(
2065                    Some(&mut dbtx),
2066                    Some(TxMetadata {
2067                        deposit_outpoint: Some(deposit_outpoint),
2068                        operator_xonly_pk: None,
2069                        round_idx: None,
2070                        kickoff_idx: None,
2071                        tx_type: TransactionType::MoveToVault,
2072                    }),
2073                    &movetx,
2074                    FeePayingType::CPFP,
2075                    None,
2076                    &[],
2077                    &[],
2078                    &[],
2079                    &[],
2080                )
2081                .await
2082                .map_to_status()?;
2083            dbtx.commit()
2084                .await
2085                .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
2086
2087            Ok(Response::new(movetx.compute_txid().into()))
2088        }
2089    }
2090}
2091
2092#[cfg(test)]
2093mod tests {
2094    use crate::actor::Actor;
2095    use crate::builder;
2096    use crate::config::BridgeConfig;
2097    use crate::deposit::{BaseDepositData, DepositInfo, DepositType};
2098    use crate::musig2::AggregateFromPublicKeys;
2099    use crate::rpc::clementine::clementine_aggregator_client::ClementineAggregatorClient;
2100    use crate::rpc::clementine::{self, GetEntityStatusesRequest, SendMoveTxRequest};
2101    use crate::rpc::get_clients;
2102    use crate::servers::create_aggregator_unix_server;
2103    use crate::test::common::citrea::MockCitreaClient;
2104    use crate::test::common::tx_utils::ensure_tx_onchain;
2105    use crate::test::common::*;
2106    use bitcoin::hashes::Hash;
2107    use bitcoincore_rpc::RpcApi;
2108    use clementine_primitives::EVMAddress;
2109    use eyre::Context;
2110    use std::time::Duration;
2111    use tokio::time::sleep;
2112    use tonic::{Request, Status};
2113
2114    #[cfg(feature = "automation")]
2115    async fn perform_deposit(mut config: BridgeConfig) -> Result<(), Status> {
2116        let regtest = create_regtest_rpc(&mut config).await;
2117        let rpc = regtest.rpc();
2118
2119        let actors = create_actors::<MockCitreaClient>(&config).await;
2120        let _unused =
2121            run_single_deposit::<MockCitreaClient>(&mut config, rpc.clone(), None, &actors, None)
2122                .await?;
2123
2124        Ok(())
2125    }
2126
2127    #[tokio::test(flavor = "multi_thread")]
2128    async fn aggregator_double_deposit() {
2129        let mut config = create_test_config_with_thread_name().await;
2130        let regtest = create_regtest_rpc(&mut config).await;
2131        let rpc = regtest.rpc();
2132        let actors = create_actors::<MockCitreaClient>(&config).await;
2133        let mut aggregator = actors.get_aggregator();
2134
2135        let evm_address = EVMAddress([1u8; 20]);
2136        let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2137
2138        let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2139            .setup(tonic::Request::new(clementine::Empty {}))
2140            .await
2141            .unwrap()
2142            .into_inner()
2143            .try_into()
2144            .unwrap();
2145        sleep(Duration::from_secs(3)).await;
2146
2147        let nofn_xonly_pk =
2148            bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2149
2150        let deposit_address = builder::address::generate_deposit_address(
2151            nofn_xonly_pk,
2152            signer.address.as_unchecked(),
2153            evm_address,
2154            config.protocol_paramset().network,
2155            config.protocol_paramset().user_takes_after,
2156        )
2157        .unwrap()
2158        .0;
2159
2160        let deposit_outpoint = rpc
2161            .send_to_address(&deposit_address, config.protocol_paramset().bridge_amount)
2162            .await
2163            .unwrap();
2164        rpc.mine_blocks(18).await.unwrap();
2165
2166        let deposit_info = DepositInfo {
2167            deposit_outpoint,
2168            deposit_type: DepositType::BaseDeposit(BaseDepositData {
2169                evm_address,
2170                recovery_taproot_address: signer.address.as_unchecked().clone(),
2171            }),
2172        };
2173
2174        // Two deposits with the same values.
2175        let movetx_one = aggregator
2176            .new_deposit(clementine::Deposit::from(deposit_info.clone()))
2177            .await
2178            .unwrap()
2179            .into_inner();
2180        let movetx_one_txid: bitcoin::Txid = aggregator
2181            .send_move_to_vault_tx(SendMoveTxRequest {
2182                deposit_outpoint: Some(deposit_outpoint.into()),
2183                raw_tx: Some(movetx_one),
2184            })
2185            .await
2186            .unwrap()
2187            .into_inner()
2188            .try_into()
2189            .unwrap();
2190
2191        let movetx_two = aggregator
2192            .new_deposit(clementine::Deposit::from(deposit_info))
2193            .await
2194            .unwrap()
2195            .into_inner();
2196        let _movetx_two_txid: bitcoin::Txid = aggregator
2197            .send_move_to_vault_tx(SendMoveTxRequest {
2198                deposit_outpoint: Some(deposit_outpoint.into()),
2199                raw_tx: Some(movetx_two),
2200            })
2201            .await
2202            .unwrap()
2203            .into_inner()
2204            .try_into()
2205            .unwrap();
2206        rpc.mine_blocks(1).await.unwrap();
2207        sleep(Duration::from_secs(3)).await;
2208
2209        poll_until_condition(
2210            async || {
2211                rpc.mine_blocks(1).await.unwrap();
2212                Ok(rpc
2213                    .is_tx_on_chain(&movetx_one_txid)
2214                    .await
2215                    .unwrap_or_default())
2216            },
2217            None,
2218            None,
2219        )
2220        .await
2221        .wrap_err_with(|| eyre::eyre!("MoveTx did not land onchain"))
2222        .unwrap();
2223    }
2224
2225    #[tokio::test(flavor = "multi_thread")]
2226    async fn aggregator_deposit_movetx_lands_onchain() {
2227        let mut config = create_test_config_with_thread_name().await;
2228        let regtest = create_regtest_rpc(&mut config).await;
2229        let rpc = regtest.rpc();
2230        let actors = create_actors::<MockCitreaClient>(&config).await;
2231        let mut aggregator = actors.get_aggregator();
2232
2233        let evm_address = EVMAddress([1u8; 20]);
2234        let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2235
2236        let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2237            .setup(tonic::Request::new(clementine::Empty {}))
2238            .await
2239            .unwrap()
2240            .into_inner()
2241            .try_into()
2242            .unwrap();
2243        sleep(Duration::from_secs(3)).await;
2244
2245        let nofn_xonly_pk =
2246            bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2247
2248        let deposit_address = builder::address::generate_deposit_address(
2249            nofn_xonly_pk,
2250            signer.address.as_unchecked(),
2251            evm_address,
2252            config.protocol_paramset().network,
2253            config.protocol_paramset().user_takes_after,
2254        )
2255        .unwrap()
2256        .0;
2257
2258        let deposit_outpoint = rpc
2259            .send_to_address(&deposit_address, config.protocol_paramset().bridge_amount)
2260            .await
2261            .unwrap();
2262        rpc.mine_blocks(18).await.unwrap();
2263
2264        let deposit_info = DepositInfo {
2265            deposit_outpoint,
2266            deposit_type: DepositType::BaseDeposit(BaseDepositData {
2267                evm_address,
2268                recovery_taproot_address: signer.address.as_unchecked().clone(),
2269            }),
2270        };
2271
2272        // Generate and broadcast the move-to-vault transaction
2273        let start_time = std::time::Instant::now();
2274        let raw_move_tx = aggregator
2275            .new_deposit(clementine::Deposit::from(deposit_info))
2276            .await
2277            .unwrap()
2278            .into_inner();
2279        let end_time = std::time::Instant::now();
2280        tracing::info!("New deposit time: {:?}", end_time - start_time);
2281
2282        let movetx_txid = aggregator
2283            .send_move_to_vault_tx(SendMoveTxRequest {
2284                deposit_outpoint: Some(deposit_outpoint.into()),
2285                raw_tx: Some(raw_move_tx),
2286            })
2287            .await
2288            .unwrap()
2289            .into_inner()
2290            .try_into()
2291            .unwrap();
2292
2293        rpc.mine_blocks(1).await.unwrap();
2294        sleep(Duration::from_secs(3)).await;
2295
2296        poll_until_condition(
2297            async || {
2298                rpc.mine_blocks(1).await.unwrap();
2299                Ok(rpc.is_tx_on_chain(&movetx_txid).await.unwrap_or_default())
2300            },
2301            None,
2302            None,
2303        )
2304        .await
2305        .wrap_err_with(|| eyre::eyre!("MoveTx did not land onchain"))
2306        .unwrap();
2307    }
2308
2309    #[tokio::test]
2310    async fn aggregator_two_deposit_movetx_and_emergency_stop() {
2311        let mut config = create_test_config_with_thread_name().await;
2312        let regtest = create_regtest_rpc(&mut config).await;
2313        let rpc = regtest.rpc();
2314        let actors = create_actors::<MockCitreaClient>(&config).await;
2315        let mut aggregator = actors.get_aggregator();
2316
2317        let evm_address = EVMAddress([1u8; 20]);
2318        let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2319
2320        let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2321            .setup(tonic::Request::new(clementine::Empty {}))
2322            .await
2323            .unwrap()
2324            .into_inner()
2325            .try_into()
2326            .unwrap();
2327        sleep(Duration::from_secs(3)).await;
2328
2329        let nofn_xonly_pk =
2330            bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2331
2332        let deposit_address_0 = builder::address::generate_deposit_address(
2333            nofn_xonly_pk,
2334            signer.address.as_unchecked(),
2335            evm_address,
2336            config.protocol_paramset().network,
2337            config.protocol_paramset().user_takes_after,
2338        )
2339        .unwrap()
2340        .0;
2341
2342        let deposit_address_1 = builder::address::generate_deposit_address(
2343            nofn_xonly_pk,
2344            signer.address.as_unchecked(),
2345            evm_address,
2346            config.protocol_paramset().network,
2347            config.protocol_paramset().user_takes_after,
2348        )
2349        .unwrap()
2350        .0;
2351
2352        let deposit_outpoint_0 = rpc
2353            .send_to_address(&deposit_address_0, config.protocol_paramset().bridge_amount)
2354            .await
2355            .unwrap();
2356        rpc.mine_blocks(18).await.unwrap();
2357
2358        let deposit_outpoint_1 = rpc
2359            .send_to_address(&deposit_address_1, config.protocol_paramset().bridge_amount)
2360            .await
2361            .unwrap();
2362        rpc.mine_blocks(18).await.unwrap();
2363
2364        let deposit_info_0 = DepositInfo {
2365            deposit_outpoint: deposit_outpoint_0,
2366            deposit_type: DepositType::BaseDeposit(BaseDepositData {
2367                evm_address,
2368                recovery_taproot_address: signer.address.as_unchecked().clone(),
2369            }),
2370        };
2371
2372        let deposit_info_1 = DepositInfo {
2373            deposit_outpoint: deposit_outpoint_1,
2374            deposit_type: DepositType::BaseDeposit(BaseDepositData {
2375                evm_address,
2376                recovery_taproot_address: signer.address.as_unchecked().clone(),
2377            }),
2378        };
2379
2380        // Generate and broadcast the move-to-vault tx for the first deposit
2381        let raw_move_tx_0 = aggregator
2382            .new_deposit(clementine::Deposit::from(deposit_info_0))
2383            .await
2384            .unwrap()
2385            .into_inner();
2386        let move_txid_0: bitcoin::Txid = aggregator
2387            .send_move_to_vault_tx(SendMoveTxRequest {
2388                deposit_outpoint: Some(deposit_outpoint_0.into()),
2389                raw_tx: Some(raw_move_tx_0),
2390            })
2391            .await
2392            .unwrap()
2393            .into_inner()
2394            .try_into()
2395            .unwrap();
2396
2397        rpc.mine_blocks(1).await.unwrap();
2398        sleep(Duration::from_secs(3)).await;
2399        ensure_tx_onchain(rpc, move_txid_0)
2400            .await
2401            .expect("failed to get movetx_0 on chain");
2402
2403        // Generate and broadcast the move-to-vault tx for the second deposit
2404        let raw_move_tx_1 = aggregator
2405            .new_deposit(clementine::Deposit::from(deposit_info_1))
2406            .await
2407            .unwrap()
2408            .into_inner();
2409        let move_txid_1 = aggregator
2410            .send_move_to_vault_tx(SendMoveTxRequest {
2411                deposit_outpoint: Some(deposit_outpoint_1.into()),
2412                raw_tx: Some(raw_move_tx_1),
2413            })
2414            .await
2415            .unwrap()
2416            .into_inner()
2417            .try_into()
2418            .unwrap();
2419
2420        rpc.mine_blocks(1).await.unwrap();
2421        ensure_tx_onchain(rpc, move_txid_1)
2422            .await
2423            .expect("failed to get movetx_1 on chain");
2424        sleep(Duration::from_secs(3)).await;
2425
2426        let move_txids = vec![move_txid_0, move_txid_1];
2427
2428        tracing::debug!("Move txids: {:?}", move_txids);
2429
2430        let emergency_txid = aggregator
2431            .internal_get_emergency_stop_tx(tonic::Request::new(
2432                clementine::GetEmergencyStopTxRequest {
2433                    txids: move_txids
2434                        .iter()
2435                        .map(|txid| clementine::Txid {
2436                            txid: txid.to_byte_array().to_vec(),
2437                        })
2438                        .collect(),
2439                },
2440            ))
2441            .await
2442            .unwrap()
2443            .into_inner();
2444
2445        let decryption_priv_key =
2446            hex::decode("a80bc8cf095c2b37d4c6233114e0dd91f43d75de5602466232dbfcc1fc66c542")
2447                .expect("Failed to parse emergency stop encryption public key");
2448        let emergency_stop_tx: bitcoin::Transaction = bitcoin::consensus::deserialize(
2449            &crate::encryption::decrypt_bytes(
2450                &decryption_priv_key,
2451                &emergency_txid.encrypted_emergency_stop_txs[0],
2452            )
2453            .expect("Failed to decrypt emergency stop tx"),
2454        )
2455        .expect("Failed to deserialize");
2456
2457        rpc.send_raw_transaction(&emergency_stop_tx)
2458            .await
2459            .expect("Failed to send emergency stop tx");
2460
2461        let emergency_stop_txid = emergency_stop_tx.compute_txid();
2462        rpc.mine_blocks(1).await.unwrap();
2463
2464        poll_until_condition(
2465            async || {
2466                rpc.mine_blocks(1).await.unwrap();
2467                Ok(rpc
2468                    .is_tx_on_chain(&emergency_stop_txid)
2469                    .await
2470                    .unwrap_or_default())
2471            },
2472            None,
2473            None,
2474        )
2475        .await
2476        .wrap_err_with(|| eyre::eyre!("Emergency stop tx did not land onchain"))
2477        .unwrap();
2478    }
2479
2480    #[cfg(feature = "automation")]
2481    #[tokio::test]
2482    #[ignore = "This test does not work"]
2483    async fn aggregator_deposit_finalize_verifier_timeout() {
2484        let mut config = create_test_config_with_thread_name().await;
2485        config
2486            .test_params
2487            .timeout_params
2488            .deposit_finalize_verifier_idx = Some(0);
2489        let res = perform_deposit(config).await;
2490        assert!(res.is_err());
2491        let err_string = res.unwrap_err().to_string();
2492        assert!(
2493            err_string.contains("Deposit finalization from verifiers"),
2494            "Error string was: {err_string}"
2495        );
2496    }
2497
2498    #[cfg(feature = "automation")]
2499    #[tokio::test]
2500    async fn aggregator_deposit_key_distribution_verifier_timeout() {
2501        let mut config = create_test_config_with_thread_name().await;
2502        config
2503            .test_params
2504            .timeout_params
2505            .key_distribution_verifier_idx = Some(0);
2506
2507        let res = perform_deposit(config).await;
2508
2509        assert!(res.is_err());
2510        let err_string = res.unwrap_err().to_string();
2511        assert!(
2512            err_string.contains("Verifier key distribution (id:"),
2513            "Error string was: {err_string}"
2514        );
2515    }
2516
2517    #[cfg(feature = "automation")]
2518    #[tokio::test]
2519    async fn aggregator_deposit_key_distribution_operator_timeout() {
2520        let mut config = create_test_config_with_thread_name().await;
2521        config
2522            .test_params
2523            .timeout_params
2524            .key_collection_operator_idx = Some(0);
2525
2526        let res = perform_deposit(config).await;
2527
2528        assert!(res.is_err());
2529        let err_string = res.unwrap_err().to_string();
2530        assert!(
2531            err_string.contains("Operator key collection (id:"),
2532            "Error string was: {err_string}"
2533        );
2534    }
2535
2536    #[cfg(feature = "automation")]
2537    #[tokio::test]
2538    async fn aggregator_deposit_nonce_stream_creation_verifier_timeout() {
2539        let mut config = create_test_config_with_thread_name().await;
2540        config
2541            .test_params
2542            .timeout_params
2543            .nonce_stream_creation_verifier_idx = Some(0);
2544
2545        let res = perform_deposit(config).await;
2546
2547        assert!(res.is_err());
2548        let err_string = res.unwrap_err().to_string();
2549        assert!(
2550            err_string.contains("Nonce stream creation (id:"),
2551            "Error string was: {err_string}"
2552        );
2553    }
2554
2555    #[cfg(feature = "automation")]
2556    #[tokio::test]
2557    async fn aggregator_deposit_partial_sig_stream_creation_timeout() {
2558        let mut config = create_test_config_with_thread_name().await;
2559        config
2560            .test_params
2561            .timeout_params
2562            .partial_sig_stream_creation_verifier_idx = Some(0);
2563
2564        let res = perform_deposit(config).await;
2565
2566        assert!(res.is_err());
2567        let err_string = res.unwrap_err().to_string();
2568        assert!(
2569            err_string.contains("Partial signature stream creation (id:"),
2570            "Error string was: {err_string}"
2571        );
2572    }
2573
2574    #[cfg(feature = "automation")]
2575    #[tokio::test]
2576    async fn aggregator_deposit_operator_sig_collection_operator_timeout() {
2577        let mut config = create_test_config_with_thread_name().await;
2578        config
2579            .test_params
2580            .timeout_params
2581            .operator_sig_collection_operator_idx = Some(0);
2582
2583        let res = perform_deposit(config).await;
2584
2585        assert!(res.is_err());
2586        let err_string = res.unwrap_err().to_string();
2587        assert!(
2588            err_string.contains("Operator signature stream creation (id:"),
2589            "Error string was: {err_string}"
2590        );
2591    }
2592
2593    #[tokio::test]
2594    async fn aggregator_get_entity_statuses() {
2595        let mut config = create_test_config_with_thread_name().await;
2596        let _regtest = create_regtest_rpc(&mut config).await;
2597
2598        let actors = create_actors::<MockCitreaClient>(&config).await;
2599        let mut aggregator = actors.get_aggregator();
2600        let status = aggregator
2601            .get_entity_statuses(Request::new(GetEntityStatusesRequest {
2602                restart_tasks: false,
2603            }))
2604            .await
2605            .unwrap()
2606            .into_inner();
2607
2608        tracing::info!("Status: {:?}", status);
2609
2610        assert_eq!(
2611            status.entity_statuses.len(),
2612            config.test_params.all_operators_secret_keys.len()
2613                + config.test_params.all_verifiers_secret_keys.len()
2614        );
2615    }
2616
2617    #[tokio::test]
2618    async fn aggregator_start_with_offline_verifier() {
2619        let mut config = create_test_config_with_thread_name().await;
2620        // Create regtest rpc
2621        let _regtest = create_regtest_rpc(&mut config).await;
2622        // random ips
2623        config.verifier_endpoints = Some(vec!["https://142.143.144.145:17001".to_string()]);
2624        config.operator_endpoints = Some(vec!["https://142.143.144.145:17002".to_string()]);
2625        // Create temporary directory for aggregator socket
2626        let socket_dir = tempfile::tempdir().unwrap();
2627        let socket_path = socket_dir.path().join("aggregator.sock");
2628
2629        tracing::info!("Creating unix aggregator server");
2630
2631        let (_, _shutdown_tx) = create_aggregator_unix_server(config.clone(), socket_path.clone())
2632            .await
2633            .unwrap();
2634
2635        tracing::info!("Created unix aggregator server");
2636
2637        let mut aggregator_client = get_clients(
2638            vec![format!("unix://{}", socket_path.display())],
2639            ClementineAggregatorClient::new,
2640            &config,
2641            false,
2642        )
2643        .await
2644        .unwrap()
2645        .pop()
2646        .unwrap();
2647
2648        tracing::info!("Got aggregator client");
2649
2650        // vergen should work
2651        assert!(aggregator_client
2652            .vergen(Request::new(clementine::Empty {}))
2653            .await
2654            .is_ok());
2655
2656        tracing::info!("After vergen");
2657
2658        // setup should give error as it can't connect to the verifier
2659        assert!(aggregator_client
2660            .setup(Request::new(clementine::Empty {}))
2661            .await
2662            .is_err());
2663
2664        tracing::info!("After setup");
2665
2666        // aggregator should still be up even after not connecting to the verifier
2667        // and should be able to get metrics
2668        tracing::info!(
2669            "Entity statuses: {:?}",
2670            aggregator_client
2671                .get_entity_statuses(Request::new(GetEntityStatusesRequest {
2672                    restart_tasks: false,
2673                }))
2674                .await
2675                .unwrap()
2676        );
2677    }
2678}