1use super::clementine::{
2 clementine_aggregator_server::ClementineAggregator, verifier_deposit_finalize_params,
3 DepositParams, Empty, VerifierDepositFinalizeParams,
4};
5use super::clementine::{
6 AggregatorWithdrawResponse, Deposit, EntityStatuses, GetEntityStatusesRequest,
7 OptimisticPayoutParams, RawSignedTx, VergenResponse, VerifierPublicKeys,
8};
9use crate::aggregator::{
10 AggregatorServer, OperatorId, ParticipatingOperators, ParticipatingVerifiers, VerifierId,
11};
12use crate::bitvm_client::SECP;
13use crate::builder::sighash::SignatureInfo;
14use crate::builder::transaction::{
15 create_emergency_stop_txhandler, create_move_to_vault_txhandler,
16 create_optimistic_payout_txhandler, Signed, TransactionType, TxHandler,
17};
18use crate::compatibility::ActorWithConfig;
19use crate::config::BridgeConfig;
20use crate::constants::{
21 DEPOSIT_FINALIZATION_TIMEOUT, DEPOSIT_FINALIZE_STREAM_CREATION_TIMEOUT,
22 KEY_DISTRIBUTION_TIMEOUT, NONCE_STREAM_CREATION_TIMEOUT, OPERATOR_SIGS_STREAM_CREATION_TIMEOUT,
23 OPERATOR_SIGS_TIMEOUT, OPTIMISTIC_PAYOUT_TIMEOUT, OVERALL_DEPOSIT_TIMEOUT,
24 PARTIAL_SIG_STREAM_CREATION_TIMEOUT, PIPELINE_COMPLETION_TIMEOUT, SEND_OPERATOR_SIGS_TIMEOUT,
25 SETUP_COMPLETION_TIMEOUT, WITHDRAWAL_TIMEOUT,
26};
27use crate::deposit::{Actors, DepositData, DepositInfo};
28use crate::errors::{ErrorExt, ResultExt};
29use crate::musig2::AggregateFromPublicKeys;
30use crate::rpc::clementine::{
31 operator_withrawal_response, AggregatorWithdrawalInput, CompatibilityParamsRpc,
32 EntitiesCompatibilityData, OperatorWithrawalResponse, VerifierDepositSignParams,
33};
34use crate::rpc::parser;
35use crate::utils::{
36 flatten_join_named_results, get_vergen_response, join_all_combine_errors, timed_request,
37 timed_try_join_all, ScriptBufExt,
38};
39use crate::utils::{FeePayingType, TxMetadata};
40use crate::UTXO;
41use crate::{
42 aggregator::Aggregator,
43 builder::sighash::create_nofn_sighash_stream,
44 errors::BridgeError,
45 musig2::aggregate_nonces,
46 rpc::clementine::{self, DepositSignSession},
47};
48use bitcoin::hashes::Hash;
49use bitcoin::secp256k1::schnorr::Signature;
50use bitcoin::secp256k1::{Message, PublicKey};
51use bitcoin::{TapSighash, TxOut, Txid, XOnlyPublicKey};
52use eyre::{Context, OptionExt};
53use futures::future::join_all;
54use futures::{
55 stream::{BoxStream, TryStreamExt},
56 FutureExt, Stream, StreamExt, TryFutureExt,
57};
58use secp256k1::musig::{AggregatedNonce, PartialSignature, PublicNonce};
59use std::future::Future;
60use tokio::sync::mpsc::{channel, Receiver, Sender};
61use tonic::{async_trait, Request, Response, Status, Streaming};
62
63struct AggNonceQueueItem {
64 agg_nonce: AggregatedNonce,
65 sighash: TapSighash,
66}
67
68#[derive(Debug, Clone)]
69struct FinalSigQueueItem {
70 final_sig: Vec<u8>,
71}
72
73#[derive(Debug, thiserror::Error)]
74pub enum AggregatorError {
75 #[error("Failed to receive from {stream_name} stream.")]
76 InputStreamEndedEarlyUnknownSize { stream_name: String },
77 #[error("Failed to send to {stream_name} stream.")]
78 OutputStreamEndedEarly { stream_name: String },
79 #[error("Failed to send request to {request_name} stream.")]
80 RequestFailed { request_name: String },
81}
82
83async fn get_next_pub_nonces(
84 nonce_streams: &mut [impl Stream<Item = Result<PublicNonce, BridgeError>>
85 + Unpin
86 + Send
87 + 'static],
88 verifiers_ids: &[VerifierId],
89) -> Result<Vec<PublicNonce>, BridgeError> {
90 join_all_combine_errors(
91 nonce_streams
92 .iter_mut()
93 .zip(verifiers_ids)
94 .map(|(s, id)| async move {
95 s.next()
96 .await
97 .transpose()
98 .wrap_err(format!("Failed to get nonce from {id}"))? .ok_or_else(|| -> eyre::Report {
100 AggregatorError::InputStreamEndedEarlyUnknownSize {
101 stream_name: format!("Nonce stream {id}"),
103 }
104 .into()
105 })
106 }),
107 )
108 .await
109}
110
111async fn nonce_aggregator(
113 mut nonce_streams: Vec<
114 impl Stream<Item = Result<PublicNonce, BridgeError>> + Unpin + Send + 'static,
115 >,
116 mut sighash_stream: impl Stream<Item = Result<(TapSighash, SignatureInfo), BridgeError>>
117 + Unpin
118 + Send
119 + 'static,
120 agg_nonce_sender: Sender<(AggNonceQueueItem, Vec<PublicNonce>)>,
121 needed_nofn_sigs: usize,
122 verifiers_ids: Vec<VerifierId>,
123) -> Result<
124 (
125 (AggregatedNonce, Vec<PublicNonce>),
126 (AggregatedNonce, Vec<PublicNonce>),
127 ),
128 BridgeError,
129> {
130 let mut total_sigs = 0;
131
132 tracing::info!("Starting nonce aggregation (expecting {needed_nofn_sigs} nonces)");
133
134 if verifiers_ids.len() != nonce_streams.len() {
136 return Err(
137 eyre::eyre!("Number of verifiers ids and nonce streams must be the same").into(),
138 );
139 }
140
141 while let Some(msg) = sighash_stream.next().await {
142 let (sighash, siginfo) = msg.wrap_err("Sighash stream failed")?;
143
144 total_sigs += 1;
145
146 let pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
147 .await
148 .wrap_err_with(|| {
149 format!("Failed to get nonces from verifiers for sighash #{total_sigs} with siginfo: {siginfo:?}")
150 })?;
151
152 tracing::trace!(
153 "Received nonces for signature id {:?} in nonce_aggregator",
154 siginfo.signature_id
155 );
156
157 let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::<Vec<_>>().as_slice())?;
158
159 agg_nonce_sender
160 .send((AggNonceQueueItem { agg_nonce, sighash }, pub_nonces))
161 .await
162 .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
163 stream_name: "nonce_aggregator".to_string(),
164 })?;
165
166 tracing::trace!(
167 "Sent nonces for signature id {:?} in nonce_aggregator",
168 siginfo.signature_id
169 );
170 }
171 tracing::trace!(tmp_debug = 1, "Sent {total_sigs} to agg_nonce stream");
172
173 if total_sigs != needed_nofn_sigs {
175 let err_msg = format!(
176 "Expected {needed_nofn_sigs} nofn signatures, got {total_sigs} from sighash stream",
177 );
178 tracing::error!("{err_msg}");
179 return Err(eyre::eyre!(err_msg).into());
180 }
181 let movetx_pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
183 .await
184 .wrap_err("Failed to get movetx public nonces from verifiers")?;
185
186 tracing::trace!("Received nonces for movetx in nonce_aggregator");
187
188 let move_tx_agg_nonce =
189 aggregate_nonces(movetx_pub_nonces.iter().collect::<Vec<_>>().as_slice())
190 .wrap_err("Failed to aggregate movetx nonces")?;
191
192 let emergency_stop_pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
193 .await
194 .wrap_err("Failed to get emergency stop tx public nonces from verifiers")?;
195
196 let emergency_stop_agg_nonce = aggregate_nonces(
197 emergency_stop_pub_nonces
198 .iter()
199 .collect::<Vec<_>>()
200 .as_slice(),
201 )
202 .wrap_err("Failed to aggregate emergency stop tx nonces")?;
203
204 Ok((
205 (move_tx_agg_nonce, movetx_pub_nonces),
206 (emergency_stop_agg_nonce, emergency_stop_pub_nonces),
207 ))
208}
209
210async fn nonce_distributor(
212 mut agg_nonce_receiver: Receiver<(AggNonceQueueItem, Vec<PublicNonce>)>,
213 partial_sig_streams: Vec<(
214 Streaming<clementine::PartialSig>,
215 Sender<clementine::VerifierDepositSignParams>,
216 )>,
217 partial_sig_sender: Sender<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>,
218 needed_nofn_sigs: usize,
219 verifiers_ids: Vec<VerifierId>,
220) -> Result<(), BridgeError> {
221 let mut nonce_count = 0;
222 let mut sig_count = 0;
223 let (mut partial_sig_rx, mut partial_sig_tx): (Vec<_>, Vec<_>) =
224 partial_sig_streams.into_iter().unzip();
225
226 let (queue_tx, mut queue_rx) = channel(crate::constants::DEFAULT_CHANNEL_SIZE);
227
228 if verifiers_ids.len() != partial_sig_rx.len() {
230 return Err(eyre::eyre!(
231 "Number of verifiers ids and partial sig streams must be the same"
232 )
233 .into());
234 }
235 let verifiers_ids_clone = verifiers_ids.clone();
236 let handle_1 = tokio::spawn(async move {
237 while let Some((queue_item, pub_nonces)) = agg_nonce_receiver.recv().await {
238 nonce_count += 1;
239
240 tracing::trace!(
241 "Received aggregated nonce {} in nonce_distributor",
242 nonce_count
243 );
244
245 let agg_nonce_wrapped = clementine::VerifierDepositSignParams {
246 params: Some(clementine::verifier_deposit_sign_params::Params::AggNonce(
247 queue_item.agg_nonce.serialize().to_vec(),
248 )),
249 };
250
251 join_all_combine_errors(
253 partial_sig_tx
254 .iter_mut()
255 .zip(verifiers_ids_clone.iter())
256 .map(|(tx, id)| {
257 let agg_nonce_wrapped = agg_nonce_wrapped.clone();
258 async move {
259 tx.send(agg_nonce_wrapped)
260 .await
261 .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
262 stream_name: format!("Partial sig {id}"),
263 })
264 .inspect_err(|e| {
265 tracing::error!(
266 "Failed to send aggregated nonce to {id}: {:?}",
267 e
268 );
269 })
270 }
271 }),
272 )
273 .await
274 .wrap_err("Failed to send aggregated nonces to verifiers")?;
275
276 queue_tx
277 .send((queue_item, pub_nonces))
278 .await
279 .wrap_err("Other end of channel closed")?;
280
281 tracing::trace!(
282 "Sent aggregated nonce {} to verifiers in nonce_distributor",
283 nonce_count
284 );
285 if nonce_count == needed_nofn_sigs {
286 break;
287 }
288 }
289 if nonce_count != needed_nofn_sigs {
290 let err_msg = format!("Expected {needed_nofn_sigs} aggregated nonces in nonce_distributor, got {nonce_count}",);
291 tracing::error!("{err_msg}");
292 return Err(eyre::eyre!(err_msg).into());
293 }
294
295 tracing::trace!(
296 tmp_debug = 1,
297 "Broadcasted {nonce_count} agg_nonces to verifiers and to the queue"
298 );
299 Ok::<(), BridgeError>(())
300 });
301
302 let handle_2 = tokio::spawn(async move {
303 while let Some((queue_item, pub_nonces)) = queue_rx.recv().await {
304 let pub_nonces_ref = pub_nonces.as_slice();
305 if pub_nonces_ref.len() != partial_sig_rx.len() {
306 return Err(eyre::eyre!(
307 "Number of public nonces {} and partial sig streams {} must be the same",
308 pub_nonces_ref.len(),
309 partial_sig_rx.len()
310 )
311 .into());
312 }
313 let partial_sigs = join_all_combine_errors(partial_sig_rx.iter_mut().zip(pub_nonces_ref.iter()).zip(verifiers_ids.iter()).map(
314 |((stream, pub_nonce), id)| async move {
315 let partial_sig = stream
316 .message()
317 .await
318 .wrap_err_with(|| AggregatorError::RequestFailed {
319 request_name: format!("Partial sig {sig_count} from {id}"),
320 })
321 .inspect_err(|e| {
322 tracing::error!(
323 "Failed to receive partial signature {sig_count} from {id}, an error was sent: {:?}",
324 e
325 );
326 })?
327 .ok_or_eyre(AggregatorError::InputStreamEndedEarlyUnknownSize {
328 stream_name: format!("Partial sig {sig_count} from {id} closed"),
329 }).inspect_err(|e| {
330 tracing::error!(
331 "Failed to receive partial signature {sig_count} from {id}, the stream was closed: {:?}",
332 e
333 );
334 })?;
335 let partial_sig = PartialSignature::from_byte_array(
336 &partial_sig
337 .partial_sig
338 .as_slice()
339 .try_into()
340 .wrap_err("PartialSignature must be 32 bytes")?,
341 )
342 .wrap_err(format!("Failed to parse partial signature {sig_count} from {id}"))?;
343
344 Ok::<_, BridgeError>((partial_sig, *pub_nonce))
345 },
346 ))
347 .await?;
348
349 sig_count += 1;
350
351 tracing::trace!(
352 "Received partial signature {} from verifiers in nonce_distributor",
353 sig_count
354 );
355
356 partial_sig_sender
357 .send((partial_sigs, queue_item))
358 .await
359 .map_err(|_| {
360 eyre::eyre!(AggregatorError::OutputStreamEndedEarly {
361 stream_name: "partial_sig_sender".into(),
362 })
363 })?;
364
365 tracing::trace!(
366 "Sent partial signature {} to signature_aggregator in nonce_distributor",
367 sig_count
368 );
369 }
370
371 if sig_count != needed_nofn_sigs {
372 let err_msg = format!(
373 "Expected {needed_nofn_sigs} partial signatures in nonce_distributor, got {sig_count}",
374 );
375 tracing::error!("{err_msg}");
376 return Err(eyre::eyre!(err_msg).into());
377 }
378 tracing::trace!(
379 tmp_debug = 1,
380 "Sent {sig_count} partial sig bundles to partial_sigs stream"
381 );
382
383 tracing::trace!("Finished tasks in nonce_distributor handle 2");
384 Ok::<(), BridgeError>(())
385 });
386
387 let (result_1, result_2) = tokio::join!(handle_1, handle_2);
388
389 let mut task_errors = Vec::new();
390
391 match result_1 {
392 Ok(inner_result) => {
393 if let Err(e) = inner_result {
394 task_errors.push(format!(
395 "Task returned error while distributing aggnonces: {e:#?}"
396 ));
397 }
398 }
399 Err(e) => {
400 task_errors.push(format!(
401 "Task panicked while distributing aggnonces: {e:#?}"
402 ));
403 }
404 }
405
406 match result_2 {
407 Ok(inner_result) => {
408 if let Err(e) = inner_result {
409 task_errors.push(format!(
410 "Task returned error while receiving partial sigs: {e:#?}"
411 ));
412 }
413 }
414 Err(e) => {
415 task_errors.push(format!(
416 "Task panicked while receiving partial sigs: {e:#?}"
417 ));
418 }
419 }
420
421 if !task_errors.is_empty() {
422 return Err(eyre::eyre!(format!(
423 "nonce_distributor failed with errors: {:#?}",
424 task_errors
425 ))
426 .into());
427 }
428
429 tracing::debug!("Finished tasks in nonce_distributor");
430
431 Ok(())
432}
433
434async fn signature_aggregator(
437 mut partial_sig_receiver: Receiver<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>,
438 verifiers_public_keys: Vec<PublicKey>,
439 final_sig_sender: Sender<FinalSigQueueItem>,
440 needed_nofn_sigs: usize,
441) -> Result<(), BridgeError> {
442 let mut sig_count = 0;
443 while let Some((partial_sigs, queue_item)) = partial_sig_receiver.recv().await {
444 sig_count += 1;
445 tracing::trace!(
446 "Received partial signatures {} in signature_aggregator",
447 sig_count
448 );
449
450 let final_sig = crate::musig2::aggregate_partial_signatures(
451 verifiers_public_keys.clone(),
452 None,
453 queue_item.agg_nonce,
454 &partial_sigs,
455 Message::from_digest(queue_item.sighash.to_byte_array()),
456 )?;
457
458 final_sig_sender
459 .send(FinalSigQueueItem {
460 final_sig: final_sig.serialize().to_vec(),
461 })
462 .await
463 .wrap_err_with(|| {
464 eyre::eyre!(AggregatorError::OutputStreamEndedEarly {
465 stream_name: "final_sig_sender".into(),
466 })
467 })?;
468 tracing::trace!(
469 "Sent aggregated signature {} to signature_distributor in signature_aggregator",
470 sig_count
471 );
472
473 if sig_count == needed_nofn_sigs {
474 break;
475 }
476 }
477
478 if sig_count != needed_nofn_sigs {
479 let err_msg = format!(
480 "Expected {needed_nofn_sigs} aggregated signatures in signature_aggregator, got {sig_count}",
481 );
482 tracing::error!("{err_msg}");
483 return Err(eyre::eyre!(err_msg).into());
484 }
485
486 tracing::trace!(
487 tmp_debug = 1,
488 "Sent {sig_count} aggregated signatures to final_sig stream"
489 );
490
491 Ok(())
492}
493
494async fn signature_distributor(
497 mut final_sig_receiver: Receiver<FinalSigQueueItem>,
498 deposit_finalize_sender: Vec<Sender<VerifierDepositFinalizeParams>>,
499 agg_nonce: impl Future<
500 Output = Result<
501 (
502 (AggregatedNonce, Vec<PublicNonce>),
503 (AggregatedNonce, Vec<PublicNonce>),
504 ),
505 Status,
506 >,
507 >,
508 needed_nofn_sigs: usize,
509 verifiers_ids: Vec<VerifierId>,
510) -> Result<(), BridgeError> {
511 use verifier_deposit_finalize_params::Params;
512 let mut sig_count = 0;
513 while let Some(queue_item) = final_sig_receiver.recv().await {
514 sig_count += 1;
515 tracing::trace!("Received signature {} in signature_distributor", sig_count);
516 let final_params = VerifierDepositFinalizeParams {
517 params: Some(Params::SchnorrSig(queue_item.final_sig)),
518 };
519
520 join_all_combine_errors(
521 deposit_finalize_sender
522 .iter()
523 .zip(verifiers_ids.iter())
524 .map(|(tx, id)| {
525 let final_params = final_params.clone();
526 async move {
527 tx.send(final_params).await.wrap_err_with(|| {
528 AggregatorError::OutputStreamEndedEarly {
529 stream_name: format!("Deposit finalize sender for {id}"),
530 }
531 })
532 }
533 }),
534 )
535 .await
536 .wrap_err(format!(
537 "Failed to send final signature {sig_count} to verifiers"
538 ))?;
539
540 tracing::trace!(
541 "Sent signature {} to verifiers in signature_distributor",
542 sig_count
543 );
544
545 if sig_count == needed_nofn_sigs {
546 break;
547 }
548 }
549
550 if sig_count != needed_nofn_sigs {
551 let err_msg = format!(
552 "Expected {needed_nofn_sigs} signatures in signature_distributor, got {sig_count}",
553 );
554 tracing::error!("{err_msg}");
555 return Err(eyre::eyre!(err_msg).into());
556 }
557
558 tracing::trace!(
559 tmp_debug = 1,
560 "Sent {sig_count} signatures to verifiers in deposit_finalize"
561 );
562
563 let (movetx_agg_nonce, emergency_stop_agg_nonce) = agg_nonce
564 .await
565 .wrap_err("Failed to get aggregated nonce for movetx and emergency stop")?;
566
567 tracing::info!("Got aggregated nonce for movetx and emergency stop in signature distributor");
568
569 for tx in &deposit_finalize_sender {
571 tx.send(VerifierDepositFinalizeParams {
572 params: Some(Params::MoveTxAggNonce(
573 movetx_agg_nonce.0.serialize().to_vec(),
574 )),
575 })
576 .await
577 .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
578 stream_name: "Deposit finalize sender (for movetx agg nonce)".to_string(),
579 })?;
580 }
581 tracing::info!("Sent movetx aggregated nonce to verifiers in signature distributor");
582
583 for tx in &deposit_finalize_sender {
585 tx.send(VerifierDepositFinalizeParams {
586 params: Some(Params::EmergencyStopAggNonce(
587 emergency_stop_agg_nonce.0.serialize().to_vec(),
588 )),
589 })
590 .await
591 .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
592 stream_name: "Deposit finalize sender (for emergency stop agg nonce)".to_string(),
593 })?;
594 }
595 tracing::info!("Sent emergency stop aggregated nonce to verifiers in signature distributor");
596
597 Ok(())
598}
599
600async fn create_nonce_streams(
608 verifiers: ParticipatingVerifiers,
609 num_nonces: u32,
610 #[cfg(test)] config: &crate::config::BridgeConfig,
611) -> Result<
612 (
613 Vec<clementine::NonceGenFirstResponse>,
614 Vec<BoxStream<'static, Result<PublicNonce, BridgeError>>>,
615 ),
616 BridgeError,
617> {
618 let mut nonce_streams = timed_try_join_all(
619 NONCE_STREAM_CREATION_TIMEOUT,
620 "Nonce stream creation",
621 Some(verifiers.ids()),
622 verifiers
623 .clients()
624 .into_iter()
625 .enumerate()
626 .map(|(idx, client)| {
627 let mut client = client.clone();
628 #[cfg(test)]
629 let config = config.clone();
630
631 async move {
632 #[cfg(test)]
633 config
634 .test_params
635 .timeout_params
636 .hook_timeout_nonce_stream_creation_verifier(idx)
637 .await;
638 let response_stream = client
639 .nonce_gen(tonic::Request::new(clementine::NonceGenRequest {
640 num_nonces,
641 }))
642 .await
643 .wrap_err_with(|| AggregatorError::RequestFailed {
644 request_name: format!("Nonce gen stream for verifier {idx}"),
645 })?;
646
647 Ok::<_, BridgeError>(response_stream.into_inner())
648 }
649 }),
650 )
651 .await?;
652
653 let first_responses: Vec<clementine::NonceGenFirstResponse> =
655 join_all_combine_errors(nonce_streams.iter_mut().zip(verifiers.ids()).map(
656 |(stream, id)| async move {
657 parser::verifier::parse_nonce_gen_first_response(stream)
658 .await
659 .wrap_err_with(|| format!("Failed to get initial response from {id}"))
660 },
661 ))
662 .await
663 .wrap_err("Failed to get nonce gen's initial responses from verifiers")?;
664
665 let transformed_streams = nonce_streams
666 .into_iter()
667 .zip(verifiers.ids())
668 .map(|(stream, id)| {
669 stream
670 .map(move |result| {
671 Aggregator::extract_pub_nonce(
672 result
673 .wrap_err_with(|| AggregatorError::InputStreamEndedEarlyUnknownSize {
674 stream_name: format!("Nonce gen stream for {id}"),
675 })?
676 .response,
677 )
678 })
679 .boxed()
680 })
681 .collect::<Vec<_>>();
682
683 Ok((first_responses, transformed_streams))
684}
685
686async fn collect_and_call<R, T, F, Fut>(
691 rx: &mut tokio::sync::broadcast::Receiver<Vec<T>>,
692 mut f: F,
693) -> Result<R, Status>
694where
695 R: Default,
696 T: Clone,
697 F: FnMut(Vec<T>) -> Fut,
698 Fut: Future<Output = Result<R, Status>>,
699{
700 loop {
701 match rx.recv().await {
702 Ok(params) => {
703 f(params).await?;
704 }
705 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
706 break Err(Status::internal(format!(
707 "lost {n} items due to lagging receiver"
708 )));
709 }
710 Err(tokio::sync::broadcast::error::RecvError::Closed) => break Ok(R::default()),
711 }
712 }
713}
714
715impl Aggregator {
716 fn extract_pub_nonce(
718 response: Option<clementine::nonce_gen_response::Response>,
719 ) -> Result<PublicNonce, BridgeError> {
720 match response.ok_or_eyre("NonceGen response is empty")? {
721 clementine::nonce_gen_response::Response::PubNonce(pub_nonce) => {
722 Ok(PublicNonce::from_byte_array(
723 &pub_nonce
724 .as_slice()
725 .try_into()
726 .wrap_err("PubNonce must be 66 bytes")?,
727 )
728 .wrap_err("Failed to parse pub nonce")?)
729 }
730 _ => Err(eyre::eyre!("Expected PubNonce in response").into()),
731 }
732 }
733
734 async fn collect_operator_sigs(
736 operator_clients: ParticipatingOperators,
737 config: BridgeConfig,
738 mut deposit_sign_session: DepositSignSession,
739 ) -> Result<Vec<Vec<Signature>>, BridgeError> {
740 deposit_sign_session.nonce_gen_first_responses = Vec::new(); let mut operator_sigs_streams =
742 timed_try_join_all(
744 OPERATOR_SIGS_STREAM_CREATION_TIMEOUT,
745 "Operator signature stream creation",
746 Some(operator_clients.ids()),
747 operator_clients.clients().into_iter().enumerate().map(|(idx, mut operator_client)| {
748 let sign_session = deposit_sign_session.clone();
749 #[cfg(test)]
750 let config = config.clone();
751 async move {
752 #[cfg(test)]
753 config
754 .test_params
755 .timeout_params
756 .hook_timeout_operator_sig_collection_operator(idx)
757 .await;
758 let stream = operator_client
759 .deposit_sign(tonic::Request::new(sign_session))
760 .await.wrap_err_with(|| AggregatorError::RequestFailed {
761 request_name: format!("Deposit sign stream for operator {idx}"),
762 })?;
763 Ok::<_, BridgeError>(stream.into_inner())
764 }
765 }))
766 .await?;
767
768 let deposit_data: DepositData = deposit_sign_session
769 .deposit_params
770 .clone()
771 .ok_or_else(|| eyre::eyre!("No deposit params found in deposit sign session"))?
772 .try_into()
773 .wrap_err("Failed to convert deposit params to deposit data")?;
774
775 let needed_sigs = config.get_num_required_operator_sigs(&deposit_data);
777
778 let operator_sigs =
780 join_all_combine_errors(operator_sigs_streams.iter_mut().enumerate().map(
781 |(idx, stream)| async move {
782 let mut sigs: Vec<Signature> = Vec::with_capacity(needed_sigs);
783 while let Some(sig) =
784 stream
785 .message()
786 .await
787 .wrap_err_with(|| AggregatorError::RequestFailed {
788 request_name: format!("Deposit sign stream for operator {idx}"),
789 })?
790 {
791 sigs.push(Signature::from_slice(&sig.schnorr_sig).wrap_err_with(|| {
792 format!("Failed to parse Schnorr signature from operator {idx}")
793 })?);
794 if sigs.len() == needed_sigs {
795 break;
796 }
797 }
798 Ok::<_, BridgeError>(sigs)
799 },
800 ))
801 .await
802 .wrap_err("Failed to get operator signatures from operators")?;
803
804 for (idx, sigs) in operator_sigs.iter().enumerate() {
806 if sigs.len() != needed_sigs {
807 return Err(eyre::eyre!(
808 "Not all operator sigs received from operator {}.\n Expected: {}, got: {}",
809 idx,
810 needed_sigs,
811 sigs.len()
812 )
813 .into());
814 }
815 }
816 Ok(operator_sigs)
817 }
818
819 async fn create_movetx(
820 &self,
821 partial_sigs: Vec<Vec<u8>>,
822 movetx_agg_and_pub_nonces: (AggregatedNonce, Vec<PublicNonce>),
823 deposit_params: DepositParams,
824 ) -> Result<TxHandler<Signed>, Status> {
825 let mut deposit_data: DepositData = deposit_params.try_into()?;
826 let musig_partial_sigs = parser::verifier::parse_partial_sigs(partial_sigs)?;
827
828 let mut move_txhandler =
830 create_move_to_vault_txhandler(&mut deposit_data, self.config.protocol_paramset())?;
831
832 let sighash = move_txhandler.calculate_script_spend_sighash_indexed(
833 0,
834 0,
835 bitcoin::TapSighashType::Default,
836 )?;
837
838 let musig_sigs_and_nonces = musig_partial_sigs
839 .into_iter()
840 .zip(movetx_agg_and_pub_nonces.1)
841 .collect::<Vec<_>>();
842
843 let verifiers_public_keys = deposit_data.get_verifiers();
845 let final_sig = crate::musig2::aggregate_partial_signatures(
846 verifiers_public_keys,
847 None,
848 movetx_agg_and_pub_nonces.0,
849 &musig_sigs_and_nonces,
850 Message::from_digest(sighash.to_byte_array()),
851 )?;
852
853 move_txhandler.set_p2tr_script_spend_witness(&[final_sig.as_ref()], 0, 0)?;
855
856 Ok(move_txhandler.promote()?)
857 }
858
859 async fn verify_and_save_emergency_stop_sigs(
860 &self,
861 emergency_stop_sigs: Vec<Vec<u8>>,
862 emergency_stop_agg_and_pub_nonces: (AggregatedNonce, Vec<PublicNonce>),
863 deposit_params: DepositParams,
864 ) -> Result<(), BridgeError> {
865 let mut deposit_data: DepositData = deposit_params
866 .try_into()
867 .wrap_err("Failed to convert deposit params to deposit data")?;
868 let musig_partial_sigs = parser::verifier::parse_partial_sigs(emergency_stop_sigs)
869 .wrap_err("Failed to parse emergency stop signatures")?;
870
871 let move_txhandler =
873 create_move_to_vault_txhandler(&mut deposit_data, self.config.protocol_paramset())?;
874
875 let mut emergency_stop_txhandler = create_emergency_stop_txhandler(
876 &mut deposit_data,
877 &move_txhandler,
878 self.config.protocol_paramset(),
879 )?;
880
881 let sighash = emergency_stop_txhandler.calculate_script_spend_sighash_indexed(
882 0,
883 0,
884 bitcoin::TapSighashType::SinglePlusAnyoneCanPay,
885 )?;
886
887 let verifiers_public_keys = deposit_data.get_verifiers();
888
889 let musig_sigs_and_nonces = musig_partial_sigs
890 .into_iter()
891 .zip(emergency_stop_agg_and_pub_nonces.1)
892 .collect::<Vec<_>>();
893
894 let final_sig = crate::musig2::aggregate_partial_signatures(
895 verifiers_public_keys,
896 None,
897 emergency_stop_agg_and_pub_nonces.0,
898 &musig_sigs_and_nonces,
899 Message::from_digest(sighash.to_byte_array()),
900 )
901 .wrap_err("Failed to aggregate emergency stop signatures")?;
902
903 let final_sig = bitcoin::taproot::Signature {
904 signature: final_sig,
905 sighash_type: bitcoin::TapSighashType::SinglePlusAnyoneCanPay,
906 };
907
908 emergency_stop_txhandler.set_p2tr_script_spend_witness(&[final_sig.serialize()], 0, 0)?;
910
911 let emergency_stop_tx = emergency_stop_txhandler.get_cached_tx();
912 let move_to_vault_txid = move_txhandler.get_txid();
913
914 tracing::debug!("Move to vault tx id: {}", move_to_vault_txid.to_string());
915
916 let emergency_stop_pubkey = self
917 .config
918 .emergency_stop_encryption_public_key
919 .ok_or_else(|| eyre::eyre!("Emergency stop encryption public key is not set"))?;
920 let encrypted_emergency_stop_tx = crate::encryption::encrypt_bytes(
921 emergency_stop_pubkey,
922 &bitcoin::consensus::serialize(&emergency_stop_tx),
923 )?;
924
925 self.db
926 .insert_signed_emergency_stop_tx_if_not_exists(
927 None,
928 move_to_vault_txid,
929 &encrypted_emergency_stop_tx,
930 )
931 .await?;
932
933 Ok(())
934 }
935
936 #[cfg(feature = "automation")]
937 pub async fn send_emergency_stop_tx(
938 &self,
939 tx: bitcoin::Transaction,
940 ) -> Result<bitcoin::Transaction, Status> {
941 let mut dbtx = self.db.begin_transaction().await?;
943 self.tx_sender
944 .insert_try_to_send(
945 &mut dbtx,
946 Some(TxMetadata {
947 deposit_outpoint: None,
948 operator_xonly_pk: None,
949 round_idx: None,
950 kickoff_idx: None,
951 tx_type: TransactionType::EmergencyStop,
952 }),
953 &tx,
954 FeePayingType::RBF,
955 None,
956 &[],
957 &[],
958 &[],
959 &[],
960 )
961 .await
962 .map_err(BridgeError::from)?;
963 dbtx.commit()
964 .await
965 .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
966
967 Ok(tx)
968 }
969}
970
971#[async_trait]
972impl ClementineAggregator for AggregatorServer {
973 async fn get_compatibility_params(
974 &self,
975 _request: Request<Empty>,
976 ) -> Result<Response<CompatibilityParamsRpc>, Status> {
977 let params = self.aggregator.get_compatibility_params()?;
978 Ok(Response::new(params.try_into().map_to_status()?))
979 }
980
981 async fn get_compatibility_data_from_entities(
982 &self,
983 _request: Request<Empty>,
984 ) -> Result<Response<EntitiesCompatibilityData>, Status> {
985 let data = self
986 .aggregator
987 .get_compatibility_data_from_entities()
988 .await?;
989 Ok(Response::new(EntitiesCompatibilityData {
990 entities_compatibility_data: data,
991 }))
992 }
993
994 async fn vergen(&self, _request: Request<Empty>) -> Result<Response<VergenResponse>, Status> {
995 tracing::info!("Vergen rpc called");
996 Ok(Response::new(get_vergen_response()))
997 }
998
999 async fn get_entity_statuses(
1000 &self,
1001 request: Request<GetEntityStatusesRequest>,
1002 ) -> Result<Response<EntityStatuses>, Status> {
1003 tracing::info!("Get entity statuses rpc called");
1004 let request = request.into_inner();
1005 let restart_tasks = request.restart_tasks;
1006
1007 Ok(Response::new(EntityStatuses {
1008 entity_statuses: self.aggregator.get_entity_statuses(restart_tasks).await?,
1009 }))
1010 }
1011
1012 async fn optimistic_payout(
1013 &self,
1014 request: tonic::Request<super::OptimisticWithdrawParams>,
1015 ) -> std::result::Result<tonic::Response<super::RawSignedTx>, tonic::Status> {
1016 tracing::info!("Optimistic payout rpc called");
1017 let opt_withdraw_params = request.into_inner();
1018
1019 let withdraw_params =
1020 opt_withdraw_params
1021 .withdrawal
1022 .clone()
1023 .ok_or(Status::invalid_argument(
1024 "Withdrawal params not found for optimistic payout",
1025 ))?;
1026 let (deposit_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
1027 parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
1028 tracing::info!("Parsed optimistic payout rpc params, deposit id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}, verification signature: {:?}", deposit_id, input_signature, input_outpoint, output_script_pubkey, output_amount, opt_withdraw_params.verification_signature);
1029
1030 self.check_compatibility_with_actors(true, false).await?;
1032
1033 if self
1035 .rpc
1036 .is_utxo_spent(&input_outpoint)
1037 .await
1038 .map_to_status()?
1039 {
1040 return Err(Status::invalid_argument(format!(
1041 "Withdrawal utxo is already spent: {input_outpoint:?}",
1042 )));
1043 }
1044
1045 if !(output_script_pubkey.is_p2tr()
1047 || output_script_pubkey.is_p2pkh()
1048 || output_script_pubkey.is_p2sh()
1049 || output_script_pubkey.is_p2wpkh()
1050 || output_script_pubkey.is_p2wsh())
1051 {
1052 return Err(Status::invalid_argument(format!(
1053 "Output script pubkey is not a valid script pubkey: {output_script_pubkey}, must be p2tr, p2pkh, p2sh, p2wpkh, or p2wsh"
1054 )));
1055 }
1056
1057 let withdrawal = self
1059 .db
1060 .get_move_to_vault_txid_from_citrea_deposit(None, deposit_id)
1061 .await?;
1062 if let Some(move_txid) = withdrawal {
1063 let withdrawal_utxo = self
1065 .db
1066 .get_withdrawal_utxo_from_citrea_withdrawal(None, deposit_id)
1067 .await?;
1068 if withdrawal_utxo != input_outpoint {
1069 return Err(Status::invalid_argument(format!(
1070 "Withdrawal utxo is not correct: {withdrawal_utxo:?} != {input_outpoint:?}",
1071 )));
1072 }
1073
1074 let withdrawal_prevout = self
1076 .rpc
1077 .get_txout_from_outpoint(&input_outpoint)
1078 .await
1079 .map_to_status()?;
1080
1081 let user_xonly_pk = withdrawal_prevout
1082 .script_pubkey
1083 .try_get_taproot_pk()
1084 .map_err(|_| {
1085 Status::invalid_argument(format!(
1086 "Withdrawal prevout script_pubkey is not a Taproot output: {:?}",
1087 withdrawal_prevout.script_pubkey
1088 ))
1089 })?;
1090
1091 let withdrawal_utxo = UTXO {
1092 outpoint: input_outpoint,
1093 txout: withdrawal_prevout,
1094 };
1095
1096 let output_txout = TxOut {
1097 value: output_amount,
1098 script_pubkey: output_script_pubkey,
1099 };
1100
1101 let deposit_data = self
1102 .db
1103 .get_deposit_data_with_move_tx(None, move_txid)
1104 .await?;
1105
1106 let mut deposit_data = deposit_data
1107 .ok_or(eyre::eyre!(
1108 "Deposit data not found for move txid {}",
1109 move_txid
1110 ))
1111 .map_err(BridgeError::from)?;
1112
1113 let mut opt_payout_txhandler = create_optimistic_payout_txhandler(
1114 &mut deposit_data,
1115 withdrawal_utxo,
1116 output_txout,
1117 input_signature,
1118 self.config.protocol_paramset(),
1119 )?;
1120
1121 let sighash = opt_payout_txhandler
1122 .calculate_pubkey_spend_sighash(0, input_signature.sighash_type)?;
1123
1124 let message = Message::from_digest(sighash.to_byte_array());
1125
1126 SECP.verify_schnorr(&input_signature.signature, &message, &user_xonly_pk)
1127 .map_err(|_| Status::internal("Invalid signature for optimistic payout tx. Ensure the signature uses SinglePlusAnyoneCanPay sighash type."))?;
1128
1129 let participating_verifiers = self.get_participating_verifiers(&deposit_data).await?;
1131 let verifiers_ids = participating_verifiers.ids();
1132 let (first_responses, mut nonce_streams) = {
1133 create_nonce_streams(
1134 participating_verifiers.clone(),
1135 1,
1136 #[cfg(test)]
1137 &self.config,
1138 )
1139 .await?
1140 };
1141 let pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
1143 .await
1144 .wrap_err("Failed to aggregate nonces for optimistic payout")
1145 .map_to_status()?;
1146 let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::<Vec<_>>().as_slice())?;
1147
1148 let agg_nonce_bytes = agg_nonce.serialize().to_vec();
1149 let opt_payout_sign_futures = participating_verifiers
1151 .clients()
1152 .iter()
1153 .zip(first_responses)
1154 .map(|(client, first_response)| {
1155 let mut client = client.clone();
1156 let opt_withdraw_params = opt_withdraw_params.clone();
1157 {
1158 let agg_nonce_serialized = agg_nonce_bytes.clone();
1159 async move {
1160 let mut request = Request::new(OptimisticPayoutParams {
1161 opt_withdrawal: Some(opt_withdraw_params),
1162 agg_nonce: agg_nonce_serialized,
1163 nonce_gen: Some(first_response),
1164 });
1165 request.set_timeout(OPTIMISTIC_PAYOUT_TIMEOUT);
1166 client.optimistic_payout_sign(request).await
1167 }
1168 }
1169 })
1170 .collect::<Vec<_>>();
1171
1172 let opt_payout_resps = join_all(opt_payout_sign_futures).await;
1174 let mut payout_sigs = Vec::new();
1175 let mut errors = Vec::new();
1176 for (resp, verifier_id) in opt_payout_resps
1177 .into_iter()
1178 .zip(participating_verifiers.ids())
1179 {
1180 match resp {
1181 Ok(res) => {
1182 payout_sigs.push(res.into_inner());
1183 }
1184 Err(e) => {
1185 errors.push(format!("{verifier_id} optimistic payout sign failed: {e}"));
1186 }
1187 }
1188 }
1189 if !errors.is_empty() {
1190 return Err(eyre::eyre!("{errors:?}").into_status());
1191 }
1192
1193 let sighash = opt_payout_txhandler.calculate_script_spend_sighash_indexed(
1196 1,
1197 0,
1198 bitcoin::TapSighashType::Default,
1199 )?;
1200
1201 let musig_partial_sigs = payout_sigs
1202 .into_iter()
1203 .map(|sig| {
1204 PartialSignature::from_byte_array(
1205 &sig.partial_sig
1206 .try_into()
1207 .map_err(|_| secp256k1::musig::ParseError::MalformedArg)?,
1208 )
1209 })
1210 .collect::<Result<Vec<_>, _>>()
1211 .map_err(|e| Status::internal(format!("Failed to parse partial sig: {e:?}")))?;
1212
1213 let musig_sigs_and_nonces = musig_partial_sigs
1214 .into_iter()
1215 .zip(pub_nonces)
1216 .collect::<Vec<_>>();
1217
1218 let final_sig = bitcoin::taproot::Signature {
1219 signature: crate::musig2::aggregate_partial_signatures(
1220 deposit_data.get_verifiers(),
1221 None,
1222 agg_nonce,
1223 &musig_sigs_and_nonces,
1224 Message::from_digest(sighash.to_byte_array()),
1225 )?,
1226 sighash_type: bitcoin::TapSighashType::Default,
1227 };
1228
1229 opt_payout_txhandler.set_p2tr_script_spend_witness(&[final_sig.serialize()], 1, 0)?;
1231 let opt_payout_txhandler = opt_payout_txhandler.promote()?;
1232 let opt_payout_tx = opt_payout_txhandler.get_cached_tx();
1233 tracing::info!(
1234 "Optimistic payout transaction created successfully for deposit id: {:?}",
1235 deposit_id
1236 );
1237
1238 #[cfg(feature = "automation")]
1239 {
1240 tracing::info!("Sending optimistic payout tx via tx_sender");
1241
1242 let mut dbtx = self.db.begin_transaction().await?;
1243 self.tx_sender
1244 .add_tx_to_queue(
1245 &mut dbtx,
1246 TransactionType::OptimisticPayout,
1247 opt_payout_tx,
1248 &[],
1249 None,
1250 &self.config,
1251 None,
1252 )
1253 .await
1254 .map_to_status()?;
1255 dbtx.commit().await.map_err(|e| {
1256 Status::internal(format!(
1257 "Failed to commit db transaction to send optimistic payout tx: {e}",
1258 ))
1259 })?;
1260 }
1261
1262 Ok(Response::new(RawSignedTx::from(opt_payout_tx)))
1263 } else {
1264 Err(Status::not_found(format!(
1265 "Withdrawal with index {deposit_id} not found."
1266 )))
1267 }
1268 }
1269
1270 async fn internal_send_tx(
1271 &self,
1272 request: Request<clementine::SendTxRequest>,
1273 ) -> Result<Response<Empty>, Status> {
1274 #[cfg(not(feature = "automation"))]
1275 {
1276 Err(Status::unimplemented("Automation is not enabled"))
1277 }
1278 #[cfg(feature = "automation")]
1279 {
1280 let send_tx_req = request.into_inner();
1281 let fee_type = send_tx_req.fee_type();
1282 let signed_tx: bitcoin::Transaction = send_tx_req
1283 .raw_tx
1284 .ok_or(Status::invalid_argument("Missing raw_tx"))?
1285 .try_into()?;
1286 tracing::warn!(
1287 "Internal send tx rpc called with feetype: {:?}, tx hex: {}",
1288 fee_type,
1289 bitcoin::consensus::encode::serialize_hex(&signed_tx)
1290 );
1291
1292 let mut dbtx = self.db.begin_transaction().await?;
1293 self.tx_sender
1294 .insert_try_to_send(
1295 &mut dbtx,
1296 None,
1297 &signed_tx,
1298 fee_type.try_into()?,
1299 None,
1300 &[],
1301 &[],
1302 &[],
1303 &[],
1304 )
1305 .await
1306 .map_to_status()?;
1307 dbtx.commit()
1308 .await
1309 .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
1310 Ok(Response::new(Empty {}))
1311 }
1312 }
1313
1314 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
1315 async fn setup(
1316 &self,
1317 _request: Request<Empty>,
1318 ) -> Result<Response<VerifierPublicKeys>, Status> {
1319 tracing::info!("Setup rpc called");
1320 self.check_compatibility_with_actors(true, true).await?;
1321 const CHANNEL_CAPACITY: usize = 1024 * 16;
1323 let (operator_params_tx, operator_params_rx) =
1324 tokio::sync::broadcast::channel(CHANNEL_CAPACITY);
1325 let operator_params_rx_handles = (0..self.get_verifier_clients().len())
1326 .map(|_| operator_params_rx.resubscribe())
1327 .collect::<Vec<_>>();
1328
1329 let operators = self.get_operator_clients().to_vec();
1330 let operator_pks = self.fetch_operator_keys().await?;
1331 let operator_ids = operator_pks
1332 .iter()
1333 .map(|key| OperatorId(*key))
1334 .collect::<Vec<_>>();
1335 let get_operator_params_chunked_handle = tokio::spawn(async move {
1336 tracing::info!(clients = operators.len(), "Collecting operator details...");
1337 join_all_combine_errors(operators.iter().zip(operator_ids.iter()).map(
1338 |(operator, id)| {
1339 let mut operator = operator.clone();
1340 let tx = operator_params_tx.clone();
1341 async move {
1342 let stream = operator
1343 .get_params(Request::new(Empty {}))
1344 .await
1345 .wrap_err_with(|| AggregatorError::RequestFailed {
1346 request_name: format!("Operator get params for {id}"),
1347 })
1348 .map_err(BridgeError::from)?
1349 .into_inner();
1350 tx.send(stream.try_collect::<Vec<_>>().await?)
1351 .map_err(|e| {
1352 BridgeError::from(eyre::eyre!(
1353 "Failed to read operator params for {id}: {e}"
1354 ))
1355 })?;
1356 Ok::<_, Status>(())
1357 }
1358 },
1359 ))
1360 .await
1361 .wrap_err("Failed to get operator params from operators")
1362 .map_to_status()?;
1363 Ok::<_, Status>(())
1364 });
1365
1366 let verifiers = self.get_verifier_clients().to_vec();
1367 let verifier_pks = self.fetch_verifier_keys().await?;
1368 let verifier_ids = verifier_pks
1369 .iter()
1370 .map(|key| VerifierId(*key))
1371 .collect::<Vec<_>>();
1372 let set_operator_params_handle = tokio::spawn(async move {
1373 tracing::info!("Informing verifiers of existing operators...");
1374 join_all_combine_errors(
1375 verifiers
1376 .iter()
1377 .zip(verifier_ids.iter())
1378 .zip(operator_params_rx_handles)
1379 .map(|((verifier, id), mut rx)| {
1380 let verifier = verifier.clone();
1381 async move {
1382 collect_and_call(&mut rx, |params| {
1383 let mut verifier = verifier.clone();
1384 async move {
1385 verifier
1386 .set_operator(futures::stream::iter(params))
1387 .await
1388 .wrap_err_with(|| AggregatorError::RequestFailed {
1389 request_name: format!("Verifier set_operator for {id}"),
1390 })
1391 .map_err(BridgeError::from)?;
1392 Ok::<_, Status>(())
1393 }
1394 })
1395 .await?;
1396 Ok::<_, Status>(())
1397 }
1398 }),
1399 )
1400 .await
1401 .wrap_err("Failed to set_operator for all verifiers")
1402 .map_to_status()?;
1403 Ok::<_, Status>(())
1404 });
1405
1406 let task_outputs = timed_request(
1407 SETUP_COMPLETION_TIMEOUT,
1408 "Aggregator setup pipeline",
1409 async move {
1410 Ok::<_, BridgeError>(
1411 futures::future::join_all([
1412 get_operator_params_chunked_handle,
1413 set_operator_params_handle,
1414 ])
1415 .await,
1416 )
1417 },
1418 )
1419 .await?;
1420
1421 let task_names = ["Get operator params", "Set operator params"];
1422 debug_assert_eq!(task_names.len(), task_outputs.len());
1423
1424 flatten_join_named_results(task_names.into_iter().zip(task_outputs.into_iter()))?;
1425
1426 Ok(Response::new(VerifierPublicKeys::from(verifier_pks)))
1427 }
1428
1429 async fn new_deposit(
1450 &self,
1451 request: Request<Deposit>,
1452 ) -> Result<Response<clementine::RawSignedTx>, Status> {
1453 tracing::info!("New deposit rpc called");
1454 self.check_compatibility_with_actors(true, true).await?;
1455
1456 timed_request(OVERALL_DEPOSIT_TIMEOUT, "Overall new deposit", async {
1457 let deposit_info: DepositInfo = request.into_inner().try_into()?;
1458 tracing::info!(
1459 "Parsed new deposit rpc params, deposit info: {:?}",
1460 deposit_info
1461 );
1462
1463 let deposit_data = DepositData {
1464 deposit: deposit_info.clone(),
1465 nofn_xonly_pk: None,
1466 actors: Actors {
1467 verifiers: self.fetch_verifier_keys().await?,
1468 watchtowers: vec![],
1469 operators: self.fetch_operator_keys().await?,
1470 },
1471 security_council: self.config.security_council.clone(),
1472 };
1473 tracing::info!(
1474 "Created deposit data in new_deposit for deposit info: {:?}, deposit data: {:?}",
1475 deposit_info,
1476 deposit_data
1477 );
1478
1479 let deposit_params = deposit_data.clone().into();
1480
1481 let start = std::time::Instant::now();
1483 timed_request(
1484 KEY_DISTRIBUTION_TIMEOUT,
1485 "Key collection and distribution",
1486 self.collect_and_distribute_keys(&deposit_params),
1487 )
1488 .await?;
1489 tracing::info!("Collected and distributed keys in {:?}", start.elapsed());
1490
1491 let verifiers = self.get_participating_verifiers(&deposit_data).await?;
1492 let verifiers_ids = verifiers.ids();
1493
1494 let num_required_sigs = self.config.get_num_required_nofn_sigs(&deposit_data);
1496 let num_required_nonces = num_required_sigs as u32 + 2; let (first_responses, nonce_streams) =
1498 create_nonce_streams(
1499 verifiers.clone(),
1500 num_required_nonces,
1501 #[cfg(test)]
1502 &self.config,
1503 )
1504 .await?;
1505
1506 let deposit_sign_session = DepositSignSession {
1508 deposit_params: Some(deposit_params.clone()),
1509 nonce_gen_first_responses: first_responses,
1510 };
1511
1512 let deposit_sign_param: VerifierDepositSignParams =
1513 deposit_sign_session.clone().into();
1514
1515 #[allow(clippy::unused_enumerate_index)]
1516 let partial_sig_streams = timed_try_join_all(
1517 PARTIAL_SIG_STREAM_CREATION_TIMEOUT,
1518 "Partial signature stream creation",
1519 Some(verifiers.ids()),
1520 verifiers.clients().into_iter().enumerate().map(|(_idx, verifier_client)| {
1521 let mut verifier_client = verifier_client.clone();
1522 #[cfg(test)]
1523 let config = self.config.clone();
1524
1525 let deposit_sign_param =
1526 deposit_sign_param.clone();
1527
1528 async move {
1529 #[cfg(test)]
1530 config
1531 .test_params
1532 .timeout_params
1533 .hook_timeout_partial_sig_stream_creation_verifier(_idx)
1534 .await;
1535
1536 let (tx, rx) = tokio::sync::mpsc::channel(num_required_nonces as usize + 1); let stream = verifier_client
1539 .deposit_sign(tokio_stream::wrappers::ReceiverStream::new(rx))
1540 .await?
1541 .into_inner();
1542
1543 tx.send(deposit_sign_param).await.map_err(|e| {
1544 BridgeError::from(eyre::eyre!("Failed to send deposit sign session: {e:?}"))})?;
1545
1546 Ok::<_, BridgeError>((stream, tx))
1547 }
1548 })
1549 )
1550 .await?;
1551
1552 #[allow(clippy::unused_enumerate_index)]
1554 let deposit_finalize_streams = verifiers.clients().into_iter().enumerate().map(
1555 |(_idx, mut verifier)| {
1556 let (tx, rx) = tokio::sync::mpsc::channel(num_required_nonces as usize + 1);
1557 let receiver_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1558 #[cfg(test)]
1559 let config = self.config.clone();
1560 let deposit_finalize_future = tokio::spawn(async move {
1562 #[cfg(test)]
1563 config
1564 .test_params
1565 .timeout_params
1566 .hook_timeout_deposit_finalize_verifier(_idx)
1567 .await;
1568
1569 verifier.deposit_finalize(receiver_stream).await
1570 });
1571
1572 Ok::<_, BridgeError>((deposit_finalize_future, tx))
1573 },
1574 ).collect::<Result<Vec<_>, BridgeError>>()?;
1575
1576 tracing::info!("Sending deposit finalize streams to verifiers for deposit {:?}", deposit_info);
1577
1578 let (deposit_finalize_futures, deposit_finalize_sender): (Vec<_>, Vec<_>) =
1579 deposit_finalize_streams.into_iter().unzip();
1580
1581 let deposit_finalize_first_param: VerifierDepositFinalizeParams =
1583 deposit_sign_session.clone().into();
1584
1585 timed_try_join_all(
1586 DEPOSIT_FINALIZE_STREAM_CREATION_TIMEOUT,
1587 "Deposit finalization initial param send",
1588 Some(verifiers.ids()),
1589 deposit_finalize_sender.iter().cloned().map(|tx| {
1590 let param = deposit_finalize_first_param.clone();
1591 async move {
1592 tx.send(param).await
1593 .map_err(|e| {
1594 BridgeError::from(eyre::eyre!(
1595 "Failed to send deposit finalize first param: {e:?}"))
1596 })
1597 }
1598 })
1599 ).await?;
1600
1601
1602 let deposit_blockhash = self
1603 .rpc
1604 .get_blockhash_of_tx(&deposit_data.get_deposit_outpoint().txid)
1605 .await
1606 .map_to_status()?;
1607
1608 let verifiers_public_keys = deposit_data.get_verifiers();
1609
1610 let needed_nofn_sigs = self.config.get_num_required_nofn_sigs(&deposit_data);
1611
1612 let sighash_stream = Box::pin(create_nofn_sighash_stream(
1614 self.db.clone(),
1615 self.config.clone(),
1616 deposit_data.clone(),
1617 deposit_blockhash,
1618 false,
1619 ));
1620
1621 let (agg_nonce_sender, agg_nonce_receiver) = channel(num_required_nonces as usize);
1623 let (partial_sig_sender, partial_sig_receiver) = channel(num_required_nonces as usize);
1624 let (final_sig_sender, final_sig_receiver) = channel(num_required_nonces as usize);
1625
1626 let nonce_agg_handle = tokio::spawn(nonce_aggregator(
1628 nonce_streams,
1629 sighash_stream,
1630 agg_nonce_sender,
1631 needed_nofn_sigs,
1632 verifiers_ids.clone(),
1633 ));
1634
1635 let nonce_dist_handle = tokio::spawn(nonce_distributor(
1637 agg_nonce_receiver,
1638 partial_sig_streams,
1639 partial_sig_sender,
1640 needed_nofn_sigs,
1641 verifiers_ids.clone(),
1642 ));
1643
1644 let sig_agg_handle = tokio::spawn(signature_aggregator(
1646 partial_sig_receiver,
1647 verifiers_public_keys,
1648 final_sig_sender,
1649 needed_nofn_sigs,
1650 ));
1651
1652 tracing::debug!("Getting signatures from operators");
1653 let operators = self.get_participating_operators(&deposit_data).await?;
1655
1656 let config_clone = self.config.clone();
1657 let operator_sigs_fut = tokio::spawn(async move {
1658 timed_request(
1659 OPERATOR_SIGS_TIMEOUT,
1660 "Operator signature collection",
1661 async {
1662 Aggregator::collect_operator_sigs(
1663 operators,
1664 config_clone,
1665 deposit_sign_session,
1666 )
1667 .await
1668 },
1669 )
1670 .await
1671 });
1672
1673 let nonce_agg_handle = nonce_agg_handle
1675 .map_err(|_| Status::internal("panic when aggregating nonces"))
1676 .map(
1677 |res| -> Result<((AggregatedNonce, Vec<PublicNonce>), (AggregatedNonce, Vec<PublicNonce>)), Status> {
1678 res.and_then(|r| r.map_err(Into::into))
1679 },
1680 )
1681 .shared();
1682
1683 let sig_dist_handle = tokio::spawn(signature_distributor(
1685 final_sig_receiver,
1686 deposit_finalize_sender.clone(),
1687 nonce_agg_handle.clone(),
1688 needed_nofn_sigs,
1689 verifiers_ids.clone(),
1690 ));
1691
1692 let all_op_sigs = operator_sigs_fut
1696 .await
1697 .map_err(|_| BridgeError::from(eyre::eyre!("panic when collecting operator signatures")))??;
1698
1699 tracing::info!("Got all operator signatures for deposit {:?}", deposit_info);
1700
1701 let task_outputs = timed_request(
1707 PIPELINE_COMPLETION_TIMEOUT,
1708 "MuSig2 signing pipeline",
1709 async move {
1710 Ok::<_, BridgeError>(futures::future::join_all([nonce_dist_handle, sig_agg_handle, sig_dist_handle]).await)
1711 },
1712 )
1713 .await?;
1714
1715 let task_names = ["Nonce distribution", "Signature aggregation", "Signature distribution"];
1716
1717 debug_assert_eq!(task_names.len(), task_outputs.len());
1718
1719 flatten_join_named_results(
1720 task_names.into_iter().zip(task_outputs.into_iter()),
1721 )?;
1722 tracing::info!("All deposit_sign related tasks completed for deposit {:?}, now sending operator signatures to verifiers for verification", deposit_info);
1723
1724 tracing::debug!("Pipeline tasks completed");
1725 let verifiers_ids = verifiers.ids();
1726
1727 let deposit_finalize_futures = timed_request(
1729 SEND_OPERATOR_SIGS_TIMEOUT,
1730 "Sending operator signatures to verifiers",
1731 async {
1732 let send_operator_sigs: Vec<_> = deposit_finalize_sender
1733 .iter()
1734 .zip(verifiers_ids.iter())
1735 .zip(deposit_finalize_futures.into_iter())
1736 .map(|((tx, id), dep_fin_fut)| async {
1737 for one_op_sigs in all_op_sigs.iter() {
1738 for sig in one_op_sigs.iter() {
1739 let deposit_finalize_param: VerifierDepositFinalizeParams =
1740 sig.into();
1741
1742 let send = tx.send(deposit_finalize_param).await;
1743 match send {
1744 Ok(()) => (),
1745 Err(e) => {
1746 dep_fin_fut.await.wrap_err(format!("{} deposit finalize tokio task on aggregator returned error", id.clone()))?.wrap_err(format!("{} deposit finalize rpc call returned error", id.clone()))?;
1748 return Err(BridgeError::from(eyre::eyre!(format!("{} deposit finalize stream sending returned error: {:?}", id.clone(), e))));
1749 }
1750 }
1751 }
1752 }
1753
1754 Ok::<_, BridgeError>(dep_fin_fut)
1755 })
1756 .collect();
1757 join_all_combine_errors(send_operator_sigs).await
1758 },
1759 )
1760 .await.wrap_err("Failed to send operator signatures to verifiers")?;
1761
1762 tracing::info!("All operator signatures sent to verifiers for verification, now waiting to collect movetx and emergency stop tx partial signatures from verifiers for deposit {:?}", deposit_info);
1763
1764 let partial_sigs: Vec<(Vec<u8>, Vec<u8>)> = timed_try_join_all(
1766 DEPOSIT_FINALIZATION_TIMEOUT,
1767 "Deposit finalization",
1768 Some(verifiers.ids()),
1769 deposit_finalize_futures.into_iter().map(|fut| async move {
1770 let inner = fut.await
1771 .map_err(|_| BridgeError::from(eyre::eyre!("panic finishing deposit_finalize")))??
1772 .into_inner();
1773 Ok((inner.move_to_vault_partial_sig, inner.emergency_stop_partial_sig))
1774 }),
1775 )
1776 .await?;
1777
1778
1779 let (move_to_vault_sigs, emergency_stop_sigs): (Vec<Vec<u8>>, Vec<Vec<u8>>) =
1780 partial_sigs.into_iter().unzip();
1781
1782 tracing::info!("Received move tx and emergency stop tx partial signatures for deposit {:?}", deposit_info);
1783
1784 let (movetx_agg_nonce, emergency_stop_agg_nonce) = nonce_agg_handle.await?;
1786
1787 self.verify_and_save_emergency_stop_sigs(
1789 emergency_stop_sigs,
1790 emergency_stop_agg_nonce,
1791 deposit_params.clone(),
1792 )
1793 .await?;
1794
1795 let signed_movetx_handler = self
1796 .create_movetx(move_to_vault_sigs, movetx_agg_nonce, deposit_params)
1797 .await?;
1798
1799 let raw_signed_tx = RawSignedTx {
1800 raw_tx: bitcoin::consensus::serialize(&signed_movetx_handler.get_cached_tx()),
1801 };
1802
1803 tracing::info!("Created final move transaction for deposit {:?}", deposit_info);
1804
1805 Ok(Response::new(raw_signed_tx))
1806 })
1807 .await.map_err(Into::into)
1808 }
1809
1810 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
1811 async fn withdraw(
1812 &self,
1813 request: Request<AggregatorWithdrawalInput>,
1814 ) -> Result<Response<AggregatorWithdrawResponse>, Status> {
1815 tracing::warn!("Withdraw rpc called");
1816 let request = request.into_inner();
1817 let (withdraw_params_with_sig, operator_xonly_pks) = (
1818 request.withdrawal.ok_or(Status::invalid_argument(
1819 "withdrawalParamsWithSig is missing",
1820 ))?,
1821 request.operator_xonly_pks,
1822 );
1823 self.check_compatibility_with_actors(false, true).await?;
1825
1826 let withdraw_params = withdraw_params_with_sig
1827 .clone()
1828 .withdrawal
1829 .ok_or(Status::invalid_argument("withdrawalParams is missing"))?;
1830
1831 let operator_xonly_pks_from_rpc: Vec<XOnlyPublicKey> = operator_xonly_pks
1833 .into_iter()
1834 .map(|xonly_pk| {
1835 xonly_pk.try_into().map_err(|e| {
1836 Status::invalid_argument(format!("Failed to convert xonly public key: {e}"))
1837 })
1838 })
1839 .collect::<Result<Vec<_>, Status>>()?;
1840
1841 tracing::warn!(
1842 "Parsed withdraw rpc params, withdrawal params: {:?}, operator xonly pks: {:?}",
1843 withdraw_params,
1844 operator_xonly_pks_from_rpc
1845 .iter()
1846 .map(|pk| pk.to_string())
1847 .collect::<Vec<_>>()
1848 );
1849
1850 let (withdrawal_id, _, _, _, _) =
1853 parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
1854
1855 let current_operator_xonly_pks = self.fetch_operator_keys().await?;
1858 let invalid_operator_xonly_pks = operator_xonly_pks_from_rpc
1859 .iter()
1860 .filter(|xonly_pk| !current_operator_xonly_pks.contains(xonly_pk))
1861 .collect::<Vec<_>>();
1862 if !invalid_operator_xonly_pks.is_empty() {
1863 return Err(Status::invalid_argument(format!(
1864 "Given xonly public key doesn't belong to any current operator: invalid keys: {invalid_operator_xonly_pks:?}, current operators: {current_operator_xonly_pks:?}"
1865 )));
1866 }
1867
1868 let operators = self
1869 .get_operator_clients()
1870 .iter()
1871 .zip(current_operator_xonly_pks.into_iter());
1872 let withdraw_futures = operators
1873 .filter(|(_, xonly_pk)| {
1874 operator_xonly_pks_from_rpc.is_empty()
1876 || operator_xonly_pks_from_rpc.contains(xonly_pk)
1877 })
1878 .map(|(operator, operator_xonly_pk)| {
1879 let mut operator = operator.clone();
1880 let params = withdraw_params_with_sig.clone();
1881 let mut request = Request::new(params);
1882 request.set_timeout(WITHDRAWAL_TIMEOUT);
1883 async move { (operator.withdraw(request).await, operator_xonly_pk) }
1884 });
1885
1886 let responses = futures::future::join_all(withdraw_futures).await;
1888 tracing::warn!(
1889 "Withdraw rpc completed successfully for withdrawal id: {}, operator xonly pks: {:?}, responses: {:?}",
1890 withdrawal_id,
1891 operator_xonly_pks_from_rpc
1892 .iter()
1893 .map(|pk| pk.to_string())
1894 .collect::<Vec<_>>(),
1895 responses,
1896 );
1897 Ok(Response::new(AggregatorWithdrawResponse {
1898 withdraw_responses: responses
1899 .into_iter()
1900 .map(|(res, xonly_pk)| match res {
1901 Ok(withdraw_response) => OperatorWithrawalResponse {
1902 operator_xonly_pk: Some(xonly_pk.into()),
1903 response: Some(operator_withrawal_response::Response::RawTx(
1904 withdraw_response.into_inner(),
1905 )),
1906 },
1907 Err(e) => OperatorWithrawalResponse {
1908 operator_xonly_pk: Some(xonly_pk.into()),
1909 response: Some(operator_withrawal_response::Response::Error(e.to_string())),
1910 },
1911 })
1912 .collect(),
1913 }))
1914 }
1915
1916 async fn get_nofn_aggregated_xonly_pk(
1917 &self,
1918 _: tonic::Request<super::Empty>,
1919 ) -> std::result::Result<tonic::Response<super::NofnResponse>, tonic::Status> {
1920 tracing::info!("Get nofn aggregated xonly pk rpc called");
1921 let verifier_keys = self.fetch_verifier_keys().await?;
1922 let num_verifiers = verifier_keys.len();
1923 let nofn_xonly_pk = bitcoin::XOnlyPublicKey::from_musig2_pks(verifier_keys.clone(), None)
1924 .map_err(|e| {
1925 Status::internal(format!(
1926 "Failed to aggregate verifier public keys, err: {e}, pubkeys: {verifier_keys:?}"
1927 ))
1928 })?;
1929 Ok(Response::new(super::NofnResponse {
1930 nofn_xonly_pk: nofn_xonly_pk.serialize().to_vec(),
1931 num_verifiers: num_verifiers as u32,
1932 }))
1933 }
1934
1935 async fn internal_get_emergency_stop_tx(
1936 &self,
1937 request: Request<clementine::GetEmergencyStopTxRequest>,
1938 ) -> Result<Response<clementine::GetEmergencyStopTxResponse>, Status> {
1939 tracing::warn!("Get emergency stop tx rpc called");
1940 let inner_request = request.into_inner();
1941 let txids: Vec<Txid> = inner_request
1942 .txids
1943 .into_iter()
1944 .map(|txid| {
1945 Txid::from_slice(&txid.txid).map_err(|e| {
1946 tonic::Status::invalid_argument(format!("Failed to parse txid: {e}"))
1947 })
1948 })
1949 .collect::<Result<Vec<_>, _>>()?;
1950 tracing::warn!(
1951 "Parsed get emergency stop tx rpc params, move txids: {:?}",
1952 txids
1953 .iter()
1954 .map(|txid| txid.to_string())
1955 .collect::<Vec<_>>()
1956 );
1957
1958 let emergency_stop_txs = self.db.get_emergency_stop_txs(None, txids).await?;
1959
1960 let (txids, encrypted_emergency_stop_txs): (Vec<Txid>, Vec<Vec<u8>>) =
1961 emergency_stop_txs.into_iter().unzip();
1962
1963 Ok(Response::new(clementine::GetEmergencyStopTxResponse {
1964 txids: txids.into_iter().map(|txid| txid.into()).collect(),
1965 encrypted_emergency_stop_txs,
1966 }))
1967 }
1968
1969 async fn send_move_to_vault_tx(
1970 &self,
1971 request: Request<clementine::SendMoveTxRequest>,
1972 ) -> Result<Response<clementine::Txid>, Status> {
1973 tracing::info!("Send move to vault tx rpc called");
1974 #[cfg(not(feature = "automation"))]
1975 {
1976 let _ = request;
1977 return Err(Status::unimplemented(
1978 "Automation is disabled, cannot automatically send move to vault tx.",
1979 ));
1980 }
1981
1982 #[cfg(feature = "automation")]
1983 {
1984 use bitcoin::Amount;
1985 use std::sync::Arc;
1986
1987 use crate::builder::{
1988 address::create_taproot_address,
1989 script::{CheckSig, Multisig, SpendableScript},
1990 transaction::anchor_output,
1991 };
1992
1993 let request = request.into_inner();
1994 let movetx: bitcoin::Transaction = bitcoin::consensus::deserialize(
1995 &request
1996 .raw_tx
1997 .ok_or_eyre("raw_tx is required")
1998 .map_to_status()?
1999 .raw_tx,
2000 )
2001 .wrap_err("Failed to deserialize movetx")
2002 .map_to_status()?;
2003 let deposit_outpoint: bitcoin::OutPoint = request
2004 .deposit_outpoint
2005 .ok_or(Status::invalid_argument("deposit_outpoint is required"))?
2006 .try_into()?;
2007
2008 tracing::info!(
2009 "Parsed send move to vault tx rpc params, deposit outpoint: {:?}, movetx hex: {}",
2010 deposit_outpoint,
2011 bitcoin::consensus::encode::serialize_hex(&movetx)
2012 );
2013
2014 if movetx.input.len() != 1 || movetx.output.len() != 2 {
2016 return Err(Status::invalid_argument(
2017 "Transaction is not a movetx, input or output lengths are not correct",
2018 ));
2019 }
2020 if !(movetx.output[0].value == self.config.protocol_paramset().bridge_amount
2023 && movetx.output[1].value == Amount::from_sat(0))
2024 {
2025 return Err(Status::invalid_argument(format!(
2026 "Transaction is not a movetx, output sat values are not correct, should be ({}, 0), got ({}, {})",
2027 self.config.protocol_paramset().bridge_amount,
2028 movetx.output[0].value,
2029 movetx.output[1].value,
2030 )));
2031 }
2032 let verifier_keys = self.fetch_verifier_keys().await?;
2034 let nofn_xonly_pk =
2035 bitcoin::XOnlyPublicKey::from_musig2_pks(verifier_keys.clone(), None).map_err(
2036 |e| {
2037 Status::internal(format!(
2038 "Failed to aggregate verifier public keys, err: {e}, pubkeys: {verifier_keys:?}"
2039 ))
2040 },
2041 )?;
2042 let nofn_script = Arc::new(CheckSig::new(nofn_xonly_pk));
2043 let security_council_script = Arc::new(Multisig::from_security_council(
2044 self.config.security_council.clone(),
2045 ));
2046
2047 let (addr, _) = create_taproot_address(
2048 &[
2049 nofn_script.to_script_buf(),
2050 security_council_script.to_script_buf(),
2051 ],
2052 None,
2053 self.config.protocol_paramset().network,
2054 );
2055 let bridge_script_pubkey = addr.script_pubkey();
2056
2057 if !(movetx.output[1].script_pubkey
2058 == anchor_output(self.config.protocol_paramset().anchor_amount()).script_pubkey
2059 && movetx.output[0].script_pubkey == bridge_script_pubkey)
2060 {
2061 return Err(Status::invalid_argument(
2062 format!("Transaction is not a movetx, output scriptpubkeys are not correct, expected: (vault: {:?}, anchor: {:?}), got: (vault: {:?}, anchor: {:?})",
2063 bridge_script_pubkey,
2064 anchor_output(self.config.protocol_paramset().anchor_amount()).script_pubkey,
2065 movetx.output[0].script_pubkey,
2066 movetx.output[1].script_pubkey,
2067 )));
2068 }
2069
2070 let mut dbtx = self.db.begin_transaction().await?;
2071 self.tx_sender
2072 .insert_try_to_send(
2073 &mut dbtx,
2074 Some(TxMetadata {
2075 deposit_outpoint: Some(deposit_outpoint),
2076 operator_xonly_pk: None,
2077 round_idx: None,
2078 kickoff_idx: None,
2079 tx_type: TransactionType::MoveToVault,
2080 }),
2081 &movetx,
2082 FeePayingType::CPFP,
2083 None,
2084 &[],
2085 &[],
2086 &[],
2087 &[],
2088 )
2089 .await
2090 .map_to_status()?;
2091 dbtx.commit()
2092 .await
2093 .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
2094
2095 Ok(Response::new(movetx.compute_txid().into()))
2096 }
2097 }
2098}
2099
2100#[cfg(test)]
2101mod tests {
2102 use crate::actor::Actor;
2103 use crate::config::BridgeConfig;
2104 use crate::deposit::{BaseDepositData, DepositInfo, DepositType};
2105 use crate::musig2::AggregateFromPublicKeys;
2106 use crate::rpc::clementine::clementine_aggregator_client::ClementineAggregatorClient;
2107 use crate::rpc::clementine::{self, GetEntityStatusesRequest, SendMoveTxRequest};
2108 use crate::rpc::get_clients;
2109 use crate::servers::create_aggregator_unix_server;
2110 use crate::test::common::citrea::MockCitreaClient;
2111 use crate::test::common::tx_utils::ensure_tx_onchain;
2112 use crate::test::common::*;
2113 use crate::{builder, EVMAddress};
2114 use bitcoin::hashes::Hash;
2115 use bitcoincore_rpc::RpcApi;
2116 use eyre::Context;
2117 use std::time::Duration;
2118 use tokio::time::sleep;
2119 use tonic::{Request, Status};
2120
2121 #[cfg(feature = "automation")]
2122 async fn perform_deposit(mut config: BridgeConfig) -> Result<(), Status> {
2123 let regtest = create_regtest_rpc(&mut config).await;
2124 let rpc = regtest.rpc();
2125
2126 let actors = create_actors::<MockCitreaClient>(&config).await;
2127 let _unused =
2128 run_single_deposit::<MockCitreaClient>(&mut config, rpc.clone(), None, &actors, None)
2129 .await?;
2130
2131 Ok(())
2132 }
2133
2134 #[tokio::test(flavor = "multi_thread")]
2135 async fn aggregator_double_deposit() {
2136 let mut config = create_test_config_with_thread_name().await;
2137 let regtest = create_regtest_rpc(&mut config).await;
2138 let rpc = regtest.rpc();
2139 let actors = create_actors::<MockCitreaClient>(&config).await;
2140 let mut aggregator = actors.get_aggregator();
2141
2142 let evm_address = EVMAddress([1u8; 20]);
2143 let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2144
2145 let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2146 .setup(tonic::Request::new(clementine::Empty {}))
2147 .await
2148 .unwrap()
2149 .into_inner()
2150 .try_into()
2151 .unwrap();
2152 sleep(Duration::from_secs(3)).await;
2153
2154 let nofn_xonly_pk =
2155 bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2156
2157 let deposit_address = builder::address::generate_deposit_address(
2158 nofn_xonly_pk,
2159 signer.address.as_unchecked(),
2160 evm_address,
2161 config.protocol_paramset().network,
2162 config.protocol_paramset().user_takes_after,
2163 )
2164 .unwrap()
2165 .0;
2166
2167 let deposit_outpoint = rpc
2168 .send_to_address(&deposit_address, config.protocol_paramset().bridge_amount)
2169 .await
2170 .unwrap();
2171 rpc.mine_blocks(18).await.unwrap();
2172
2173 let deposit_info = DepositInfo {
2174 deposit_outpoint,
2175 deposit_type: DepositType::BaseDeposit(BaseDepositData {
2176 evm_address,
2177 recovery_taproot_address: signer.address.as_unchecked().clone(),
2178 }),
2179 };
2180
2181 let movetx_one = aggregator
2183 .new_deposit(clementine::Deposit::from(deposit_info.clone()))
2184 .await
2185 .unwrap()
2186 .into_inner();
2187 let movetx_one_txid: bitcoin::Txid = aggregator
2188 .send_move_to_vault_tx(SendMoveTxRequest {
2189 deposit_outpoint: Some(deposit_outpoint.into()),
2190 raw_tx: Some(movetx_one),
2191 })
2192 .await
2193 .unwrap()
2194 .into_inner()
2195 .try_into()
2196 .unwrap();
2197
2198 let movetx_two = aggregator
2199 .new_deposit(clementine::Deposit::from(deposit_info))
2200 .await
2201 .unwrap()
2202 .into_inner();
2203 let _movetx_two_txid: bitcoin::Txid = aggregator
2204 .send_move_to_vault_tx(SendMoveTxRequest {
2205 deposit_outpoint: Some(deposit_outpoint.into()),
2206 raw_tx: Some(movetx_two),
2207 })
2208 .await
2209 .unwrap()
2210 .into_inner()
2211 .try_into()
2212 .unwrap();
2213 rpc.mine_blocks(1).await.unwrap();
2214 sleep(Duration::from_secs(3)).await;
2215
2216 poll_until_condition(
2217 async || {
2218 rpc.mine_blocks(1).await.unwrap();
2219 Ok(rpc
2220 .is_tx_on_chain(&movetx_one_txid)
2221 .await
2222 .unwrap_or_default())
2223 },
2224 None,
2225 None,
2226 )
2227 .await
2228 .wrap_err_with(|| eyre::eyre!("MoveTx did not land onchain"))
2229 .unwrap();
2230 }
2231
2232 #[tokio::test(flavor = "multi_thread")]
2233 async fn aggregator_deposit_movetx_lands_onchain() {
2234 let mut config = create_test_config_with_thread_name().await;
2235 let regtest = create_regtest_rpc(&mut config).await;
2236 let rpc = regtest.rpc();
2237 let actors = create_actors::<MockCitreaClient>(&config).await;
2238 let mut aggregator = actors.get_aggregator();
2239
2240 let evm_address = EVMAddress([1u8; 20]);
2241 let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2242
2243 let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2244 .setup(tonic::Request::new(clementine::Empty {}))
2245 .await
2246 .unwrap()
2247 .into_inner()
2248 .try_into()
2249 .unwrap();
2250 sleep(Duration::from_secs(3)).await;
2251
2252 let nofn_xonly_pk =
2253 bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2254
2255 let deposit_address = builder::address::generate_deposit_address(
2256 nofn_xonly_pk,
2257 signer.address.as_unchecked(),
2258 evm_address,
2259 config.protocol_paramset().network,
2260 config.protocol_paramset().user_takes_after,
2261 )
2262 .unwrap()
2263 .0;
2264
2265 let deposit_outpoint = rpc
2266 .send_to_address(&deposit_address, config.protocol_paramset().bridge_amount)
2267 .await
2268 .unwrap();
2269 rpc.mine_blocks(18).await.unwrap();
2270
2271 let deposit_info = DepositInfo {
2272 deposit_outpoint,
2273 deposit_type: DepositType::BaseDeposit(BaseDepositData {
2274 evm_address,
2275 recovery_taproot_address: signer.address.as_unchecked().clone(),
2276 }),
2277 };
2278
2279 let start_time = std::time::Instant::now();
2281 let raw_move_tx = aggregator
2282 .new_deposit(clementine::Deposit::from(deposit_info))
2283 .await
2284 .unwrap()
2285 .into_inner();
2286 let end_time = std::time::Instant::now();
2287 tracing::info!("New deposit time: {:?}", end_time - start_time);
2288
2289 let movetx_txid = aggregator
2290 .send_move_to_vault_tx(SendMoveTxRequest {
2291 deposit_outpoint: Some(deposit_outpoint.into()),
2292 raw_tx: Some(raw_move_tx),
2293 })
2294 .await
2295 .unwrap()
2296 .into_inner()
2297 .try_into()
2298 .unwrap();
2299
2300 rpc.mine_blocks(1).await.unwrap();
2301 sleep(Duration::from_secs(3)).await;
2302
2303 poll_until_condition(
2304 async || {
2305 rpc.mine_blocks(1).await.unwrap();
2306 Ok(rpc.is_tx_on_chain(&movetx_txid).await.unwrap_or_default())
2307 },
2308 None,
2309 None,
2310 )
2311 .await
2312 .wrap_err_with(|| eyre::eyre!("MoveTx did not land onchain"))
2313 .unwrap();
2314 }
2315
2316 #[tokio::test]
2317 async fn aggregator_two_deposit_movetx_and_emergency_stop() {
2318 let mut config = create_test_config_with_thread_name().await;
2319 let regtest = create_regtest_rpc(&mut config).await;
2320 let rpc = regtest.rpc();
2321 let actors = create_actors::<MockCitreaClient>(&config).await;
2322 let mut aggregator = actors.get_aggregator();
2323
2324 let evm_address = EVMAddress([1u8; 20]);
2325 let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2326
2327 let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2328 .setup(tonic::Request::new(clementine::Empty {}))
2329 .await
2330 .unwrap()
2331 .into_inner()
2332 .try_into()
2333 .unwrap();
2334 sleep(Duration::from_secs(3)).await;
2335
2336 let nofn_xonly_pk =
2337 bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2338
2339 let deposit_address_0 = builder::address::generate_deposit_address(
2340 nofn_xonly_pk,
2341 signer.address.as_unchecked(),
2342 evm_address,
2343 config.protocol_paramset().network,
2344 config.protocol_paramset().user_takes_after,
2345 )
2346 .unwrap()
2347 .0;
2348
2349 let deposit_address_1 = builder::address::generate_deposit_address(
2350 nofn_xonly_pk,
2351 signer.address.as_unchecked(),
2352 evm_address,
2353 config.protocol_paramset().network,
2354 config.protocol_paramset().user_takes_after,
2355 )
2356 .unwrap()
2357 .0;
2358
2359 let deposit_outpoint_0 = rpc
2360 .send_to_address(&deposit_address_0, config.protocol_paramset().bridge_amount)
2361 .await
2362 .unwrap();
2363 rpc.mine_blocks(18).await.unwrap();
2364
2365 let deposit_outpoint_1 = rpc
2366 .send_to_address(&deposit_address_1, config.protocol_paramset().bridge_amount)
2367 .await
2368 .unwrap();
2369 rpc.mine_blocks(18).await.unwrap();
2370
2371 let deposit_info_0 = DepositInfo {
2372 deposit_outpoint: deposit_outpoint_0,
2373 deposit_type: DepositType::BaseDeposit(BaseDepositData {
2374 evm_address,
2375 recovery_taproot_address: signer.address.as_unchecked().clone(),
2376 }),
2377 };
2378
2379 let deposit_info_1 = DepositInfo {
2380 deposit_outpoint: deposit_outpoint_1,
2381 deposit_type: DepositType::BaseDeposit(BaseDepositData {
2382 evm_address,
2383 recovery_taproot_address: signer.address.as_unchecked().clone(),
2384 }),
2385 };
2386
2387 let raw_move_tx_0 = aggregator
2389 .new_deposit(clementine::Deposit::from(deposit_info_0))
2390 .await
2391 .unwrap()
2392 .into_inner();
2393 let move_txid_0: bitcoin::Txid = aggregator
2394 .send_move_to_vault_tx(SendMoveTxRequest {
2395 deposit_outpoint: Some(deposit_outpoint_0.into()),
2396 raw_tx: Some(raw_move_tx_0),
2397 })
2398 .await
2399 .unwrap()
2400 .into_inner()
2401 .try_into()
2402 .unwrap();
2403
2404 rpc.mine_blocks(1).await.unwrap();
2405 sleep(Duration::from_secs(3)).await;
2406 ensure_tx_onchain(rpc, move_txid_0)
2407 .await
2408 .expect("failed to get movetx_0 on chain");
2409
2410 let raw_move_tx_1 = aggregator
2412 .new_deposit(clementine::Deposit::from(deposit_info_1))
2413 .await
2414 .unwrap()
2415 .into_inner();
2416 let move_txid_1 = aggregator
2417 .send_move_to_vault_tx(SendMoveTxRequest {
2418 deposit_outpoint: Some(deposit_outpoint_1.into()),
2419 raw_tx: Some(raw_move_tx_1),
2420 })
2421 .await
2422 .unwrap()
2423 .into_inner()
2424 .try_into()
2425 .unwrap();
2426
2427 rpc.mine_blocks(1).await.unwrap();
2428 ensure_tx_onchain(rpc, move_txid_1)
2429 .await
2430 .expect("failed to get movetx_1 on chain");
2431 sleep(Duration::from_secs(3)).await;
2432
2433 let move_txids = vec![move_txid_0, move_txid_1];
2434
2435 tracing::debug!("Move txids: {:?}", move_txids);
2436
2437 let emergency_txid = aggregator
2438 .internal_get_emergency_stop_tx(tonic::Request::new(
2439 clementine::GetEmergencyStopTxRequest {
2440 txids: move_txids
2441 .iter()
2442 .map(|txid| clementine::Txid {
2443 txid: txid.to_byte_array().to_vec(),
2444 })
2445 .collect(),
2446 },
2447 ))
2448 .await
2449 .unwrap()
2450 .into_inner();
2451
2452 let decryption_priv_key =
2453 hex::decode("a80bc8cf095c2b37d4c6233114e0dd91f43d75de5602466232dbfcc1fc66c542")
2454 .expect("Failed to parse emergency stop encryption public key");
2455 let emergency_stop_tx: bitcoin::Transaction = bitcoin::consensus::deserialize(
2456 &crate::encryption::decrypt_bytes(
2457 &decryption_priv_key,
2458 &emergency_txid.encrypted_emergency_stop_txs[0],
2459 )
2460 .expect("Failed to decrypt emergency stop tx"),
2461 )
2462 .expect("Failed to deserialize");
2463
2464 rpc.send_raw_transaction(&emergency_stop_tx)
2465 .await
2466 .expect("Failed to send emergency stop tx");
2467
2468 let emergency_stop_txid = emergency_stop_tx.compute_txid();
2469 rpc.mine_blocks(1).await.unwrap();
2470
2471 poll_until_condition(
2472 async || {
2473 rpc.mine_blocks(1).await.unwrap();
2474 Ok(rpc
2475 .is_tx_on_chain(&emergency_stop_txid)
2476 .await
2477 .unwrap_or_default())
2478 },
2479 None,
2480 None,
2481 )
2482 .await
2483 .wrap_err_with(|| eyre::eyre!("Emergency stop tx did not land onchain"))
2484 .unwrap();
2485 }
2486
2487 #[cfg(feature = "automation")]
2488 #[tokio::test]
2489 #[ignore = "This test does not work"]
2490 async fn aggregator_deposit_finalize_verifier_timeout() {
2491 let mut config = create_test_config_with_thread_name().await;
2492 config
2493 .test_params
2494 .timeout_params
2495 .deposit_finalize_verifier_idx = Some(0);
2496 let res = perform_deposit(config).await;
2497 assert!(res.is_err());
2498 let err_string = res.unwrap_err().to_string();
2499 assert!(
2500 err_string.contains("Deposit finalization from verifiers"),
2501 "Error string was: {err_string}"
2502 );
2503 }
2504
2505 #[cfg(feature = "automation")]
2506 #[tokio::test]
2507 async fn aggregator_deposit_key_distribution_verifier_timeout() {
2508 let mut config = create_test_config_with_thread_name().await;
2509 config
2510 .test_params
2511 .timeout_params
2512 .key_distribution_verifier_idx = Some(0);
2513
2514 let res = perform_deposit(config).await;
2515
2516 assert!(res.is_err());
2517 let err_string = res.unwrap_err().to_string();
2518 assert!(
2519 err_string.contains("Verifier key distribution (id:"),
2520 "Error string was: {err_string}"
2521 );
2522 }
2523
2524 #[cfg(feature = "automation")]
2525 #[tokio::test]
2526 async fn aggregator_deposit_key_distribution_operator_timeout() {
2527 let mut config = create_test_config_with_thread_name().await;
2528 config
2529 .test_params
2530 .timeout_params
2531 .key_collection_operator_idx = Some(0);
2532
2533 let res = perform_deposit(config).await;
2534
2535 assert!(res.is_err());
2536 let err_string = res.unwrap_err().to_string();
2537 assert!(
2538 err_string.contains("Operator key collection (id:"),
2539 "Error string was: {err_string}"
2540 );
2541 }
2542
2543 #[cfg(feature = "automation")]
2544 #[tokio::test]
2545 async fn aggregator_deposit_nonce_stream_creation_verifier_timeout() {
2546 let mut config = create_test_config_with_thread_name().await;
2547 config
2548 .test_params
2549 .timeout_params
2550 .nonce_stream_creation_verifier_idx = Some(0);
2551
2552 let res = perform_deposit(config).await;
2553
2554 assert!(res.is_err());
2555 let err_string = res.unwrap_err().to_string();
2556 assert!(
2557 err_string.contains("Nonce stream creation (id:"),
2558 "Error string was: {err_string}"
2559 );
2560 }
2561
2562 #[cfg(feature = "automation")]
2563 #[tokio::test]
2564 async fn aggregator_deposit_partial_sig_stream_creation_timeout() {
2565 let mut config = create_test_config_with_thread_name().await;
2566 config
2567 .test_params
2568 .timeout_params
2569 .partial_sig_stream_creation_verifier_idx = Some(0);
2570
2571 let res = perform_deposit(config).await;
2572
2573 assert!(res.is_err());
2574 let err_string = res.unwrap_err().to_string();
2575 assert!(
2576 err_string.contains("Partial signature stream creation (id:"),
2577 "Error string was: {err_string}"
2578 );
2579 }
2580
2581 #[cfg(feature = "automation")]
2582 #[tokio::test]
2583 async fn aggregator_deposit_operator_sig_collection_operator_timeout() {
2584 let mut config = create_test_config_with_thread_name().await;
2585 config
2586 .test_params
2587 .timeout_params
2588 .operator_sig_collection_operator_idx = Some(0);
2589
2590 let res = perform_deposit(config).await;
2591
2592 assert!(res.is_err());
2593 let err_string = res.unwrap_err().to_string();
2594 assert!(
2595 err_string.contains("Operator signature stream creation (id:"),
2596 "Error string was: {err_string}"
2597 );
2598 }
2599
2600 #[tokio::test]
2601 async fn aggregator_get_entity_statuses() {
2602 let mut config = create_test_config_with_thread_name().await;
2603 let _regtest = create_regtest_rpc(&mut config).await;
2604
2605 let actors = create_actors::<MockCitreaClient>(&config).await;
2606 let mut aggregator = actors.get_aggregator();
2607 let status = aggregator
2608 .get_entity_statuses(Request::new(GetEntityStatusesRequest {
2609 restart_tasks: false,
2610 }))
2611 .await
2612 .unwrap()
2613 .into_inner();
2614
2615 tracing::info!("Status: {:?}", status);
2616
2617 assert_eq!(
2618 status.entity_statuses.len(),
2619 config.test_params.all_operators_secret_keys.len()
2620 + config.test_params.all_verifiers_secret_keys.len()
2621 );
2622 }
2623
2624 #[tokio::test]
2625 async fn aggregator_start_with_offline_verifier() {
2626 let mut config = create_test_config_with_thread_name().await;
2627 let _regtest = create_regtest_rpc(&mut config).await;
2629 config.verifier_endpoints = Some(vec!["https://142.143.144.145:17001".to_string()]);
2631 config.operator_endpoints = Some(vec!["https://142.143.144.145:17002".to_string()]);
2632 let socket_dir = tempfile::tempdir().unwrap();
2634 let socket_path = socket_dir.path().join("aggregator.sock");
2635
2636 tracing::info!("Creating unix aggregator server");
2637
2638 let (_, _shutdown_tx) = create_aggregator_unix_server(config.clone(), socket_path.clone())
2639 .await
2640 .unwrap();
2641
2642 tracing::info!("Created unix aggregator server");
2643
2644 let mut aggregator_client = get_clients(
2645 vec![format!("unix://{}", socket_path.display())],
2646 ClementineAggregatorClient::new,
2647 &config,
2648 false,
2649 )
2650 .await
2651 .unwrap()
2652 .pop()
2653 .unwrap();
2654
2655 tracing::info!("Got aggregator client");
2656
2657 assert!(aggregator_client
2659 .vergen(Request::new(clementine::Empty {}))
2660 .await
2661 .is_ok());
2662
2663 tracing::info!("After vergen");
2664
2665 assert!(aggregator_client
2667 .setup(Request::new(clementine::Empty {}))
2668 .await
2669 .is_err());
2670
2671 tracing::info!("After setup");
2672
2673 tracing::info!(
2676 "Entity statuses: {:?}",
2677 aggregator_client
2678 .get_entity_statuses(Request::new(GetEntityStatusesRequest {
2679 restart_tasks: false,
2680 }))
2681 .await
2682 .unwrap()
2683 );
2684 }
2685}