1use super::clementine::{
2 clementine_aggregator_server::ClementineAggregator, verifier_deposit_finalize_params,
3 DepositParams, Empty, VerifierDepositFinalizeParams,
4};
5use super::clementine::{
6 AggregatorWithdrawResponse, Deposit, EntityStatuses, GetEntityStatusesRequest,
7 OptimisticPayoutParams, RawSignedTx, VergenResponse, VerifierPublicKeys,
8};
9use crate::aggregator::{
10 AggregatorServer, CompatibilityCheckScope, OperatorId, ParticipatingOperators,
11 ParticipatingVerifiers, VerifierId,
12};
13use crate::bitvm_client::SECP;
14use crate::builder::sighash::SignatureInfo;
15use crate::builder::transaction::{
16 create_emergency_stop_txhandler, create_move_to_vault_txhandler,
17 create_optimistic_payout_txhandler, Signed, TxHandler,
18};
19use crate::compatibility::ActorWithConfig;
20use crate::config::BridgeConfig;
21use crate::constants::{
22 DEPOSIT_FINALIZATION_TIMEOUT, DEPOSIT_FINALIZE_STREAM_CREATION_TIMEOUT,
23 KEY_DISTRIBUTION_TIMEOUT, NONCE_STREAM_CREATION_TIMEOUT, OPERATOR_SIGS_STREAM_CREATION_TIMEOUT,
24 OPERATOR_SIGS_TIMEOUT, OPTIMISTIC_PAYOUT_TIMEOUT, OVERALL_DEPOSIT_TIMEOUT,
25 PARTIAL_SIG_STREAM_CREATION_TIMEOUT, PIPELINE_COMPLETION_TIMEOUT, SEND_OPERATOR_SIGS_TIMEOUT,
26 SETUP_COMPLETION_TIMEOUT, WITHDRAWAL_TIMEOUT,
27};
28use crate::deposit::{Actors, DepositData, DepositInfo};
29use crate::musig2::AggregateFromPublicKeys;
30use crate::rpc::clementine::{
31 operator_withrawal_response, AggregatorWithdrawalInput, CompatibilityParamsRpc,
32 EntitiesCompatibilityData, OperatorWithrawalResponse, VerifierDepositSignParams,
33};
34use crate::rpc::parser;
35#[cfg(feature = "automation")]
36use crate::tx_sender_queue::TxSenderClientQueueExt;
37use crate::utils::{
38 flatten_join_named_results, get_vergen_response, timed_request, timed_try_join_all,
39 try_join_all_combine_errors, ScriptBufExt,
40};
41use crate::utils::{FeePayingType, TxMetadata};
42use crate::{
43 aggregator::Aggregator,
44 builder::sighash::create_nofn_sighash_stream,
45 musig2::aggregate_nonces,
46 rpc::clementine::{self, DepositSignSession},
47};
48use bitcoin::hashes::Hash;
49use bitcoin::secp256k1::schnorr::Signature;
50use bitcoin::secp256k1::{Message, PublicKey};
51use bitcoin::{TapSighash, TxOut, Txid, XOnlyPublicKey};
52use clementine_errors::BridgeError;
53use clementine_errors::TransactionType;
54use clementine_errors::{ErrorExt, ResultExt};
55use clementine_primitives::UTXO;
56use eyre::{Context, OptionExt};
57use futures::future::join_all;
58use futures::{
59 stream::{BoxStream, TryStreamExt},
60 FutureExt, Stream, StreamExt, TryFutureExt,
61};
62use secp256k1::musig::{AggregatedNonce, PartialSignature, PublicNonce};
63use std::future::Future;
64use tokio::sync::mpsc::{channel, Receiver, Sender};
65use tonic::{async_trait, Request, Response, Status, Streaming};
66struct AggNonceQueueItem {
67 agg_nonce: AggregatedNonce,
68 sighash: TapSighash,
69}
70
71struct FinalSigQueueItem {
72 final_sig: Vec<u8>,
73}
74
75use clementine_errors::AggregatorError;
76
77async fn get_next_pub_nonces(
78 nonce_streams: &mut [impl Stream<Item = Result<PublicNonce, BridgeError>>
79 + Unpin
80 + Send
81 + 'static],
82 verifiers_ids: &[VerifierId],
83) -> Result<Vec<PublicNonce>, BridgeError> {
84 try_join_all_combine_errors(nonce_streams.iter_mut().zip(verifiers_ids).map(
85 |(s, id)| async move {
86 s.next()
87 .await
88 .transpose()
89 .wrap_err(format!("Failed to get nonce from {id}"))? .ok_or_else(|| -> eyre::Report {
91 AggregatorError::InputStreamEndedEarlyUnknownSize {
92 stream_name: format!("Nonce stream {id}"),
94 }
95 .into()
96 })
97 },
98 ))
99 .await
100}
101
102async fn nonce_aggregator(
104 mut nonce_streams: Vec<
105 impl Stream<Item = Result<PublicNonce, BridgeError>> + Unpin + Send + 'static,
106 >,
107 mut sighash_stream: impl Stream<Item = Result<(TapSighash, SignatureInfo), BridgeError>>
108 + Unpin
109 + Send
110 + 'static,
111 agg_nonce_sender: Sender<(AggNonceQueueItem, Vec<PublicNonce>)>,
112 needed_nofn_sigs: usize,
113 verifiers_ids: Vec<VerifierId>,
114) -> Result<
115 (
116 (AggregatedNonce, Vec<PublicNonce>),
117 (AggregatedNonce, Vec<PublicNonce>),
118 ),
119 BridgeError,
120> {
121 let mut total_sigs = 0;
122
123 tracing::info!("Starting nonce aggregation (expecting {needed_nofn_sigs} nonces)");
124
125 if verifiers_ids.len() != nonce_streams.len() {
127 return Err(
128 eyre::eyre!("Number of verifiers ids and nonce streams must be the same").into(),
129 );
130 }
131
132 while let Some(msg) = sighash_stream.next().await {
133 let (sighash, siginfo) = msg.wrap_err("Sighash stream failed")?;
134
135 total_sigs += 1;
136
137 let pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
138 .await
139 .wrap_err_with(|| {
140 format!("Failed to get nonces from verifiers for sighash #{total_sigs} with siginfo: {siginfo:?}")
141 })?;
142
143 tracing::trace!(
144 "Received nonces for signature id {:?} in nonce_aggregator",
145 siginfo.signature_id
146 );
147
148 let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::<Vec<_>>().as_slice())?;
149
150 agg_nonce_sender
151 .send((AggNonceQueueItem { agg_nonce, sighash }, pub_nonces))
152 .await
153 .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
154 stream_name: "nonce_aggregator".to_string(),
155 })?;
156
157 tracing::trace!(
158 "Sent nonces for signature id {:?} in nonce_aggregator",
159 siginfo.signature_id
160 );
161 }
162 tracing::trace!(tmp_debug = 1, "Sent {total_sigs} to agg_nonce stream");
163
164 if total_sigs != needed_nofn_sigs {
166 let err_msg = format!(
167 "Expected {needed_nofn_sigs} nofn signatures, got {total_sigs} from sighash stream",
168 );
169 tracing::error!("{err_msg}");
170 return Err(eyre::eyre!(err_msg).into());
171 }
172 let movetx_pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
174 .await
175 .wrap_err("Failed to get movetx public nonces from verifiers")?;
176
177 tracing::trace!("Received nonces for movetx in nonce_aggregator");
178
179 let move_tx_agg_nonce =
180 aggregate_nonces(movetx_pub_nonces.iter().collect::<Vec<_>>().as_slice())
181 .wrap_err("Failed to aggregate movetx nonces")?;
182
183 let emergency_stop_pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
184 .await
185 .wrap_err("Failed to get emergency stop tx public nonces from verifiers")?;
186
187 let emergency_stop_agg_nonce = aggregate_nonces(
188 emergency_stop_pub_nonces
189 .iter()
190 .collect::<Vec<_>>()
191 .as_slice(),
192 )
193 .wrap_err("Failed to aggregate emergency stop tx nonces")?;
194
195 Ok((
196 (move_tx_agg_nonce, movetx_pub_nonces),
197 (emergency_stop_agg_nonce, emergency_stop_pub_nonces),
198 ))
199}
200
201async fn nonce_distributor(
203 mut agg_nonce_receiver: Receiver<(AggNonceQueueItem, Vec<PublicNonce>)>,
204 partial_sig_streams: Vec<(
205 Streaming<clementine::PartialSig>,
206 Sender<clementine::VerifierDepositSignParams>,
207 )>,
208 partial_sig_sender: Sender<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>,
209 needed_nofn_sigs: usize,
210 verifiers_ids: Vec<VerifierId>,
211) -> Result<(), BridgeError> {
212 let mut nonce_count = 0;
213 let mut sig_count = 0;
214 let (mut partial_sig_rx, mut partial_sig_tx): (Vec<_>, Vec<_>) =
215 partial_sig_streams.into_iter().unzip();
216
217 let (queue_tx, mut queue_rx) = channel(crate::constants::DEFAULT_CHANNEL_SIZE);
218
219 if verifiers_ids.len() != partial_sig_rx.len() {
221 return Err(eyre::eyre!(
222 "Number of verifiers ids and partial sig streams must be the same"
223 )
224 .into());
225 }
226 let verifiers_ids_clone = verifiers_ids.clone();
227 let handle_1 = tokio::spawn(async move {
228 while let Some((queue_item, pub_nonces)) = agg_nonce_receiver.recv().await {
229 nonce_count += 1;
230
231 tracing::trace!(
232 "Received aggregated nonce {} in nonce_distributor",
233 nonce_count
234 );
235
236 let agg_nonce_wrapped = clementine::VerifierDepositSignParams {
237 params: Some(clementine::verifier_deposit_sign_params::Params::AggNonce(
238 queue_item.agg_nonce.serialize().to_vec(),
239 )),
240 };
241
242 try_join_all_combine_errors(
244 partial_sig_tx
245 .iter_mut()
246 .zip(verifiers_ids_clone.iter())
247 .map(|(tx, id)| {
248 let agg_nonce_wrapped = agg_nonce_wrapped.clone();
249 async move {
250 tx.send(agg_nonce_wrapped)
251 .await
252 .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
253 stream_name: format!("Partial sig {id}"),
254 })
255 .inspect_err(|e| {
256 tracing::error!(
257 "Failed to send aggregated nonce to {id}: {:?}",
258 e
259 );
260 })
261 }
262 }),
263 )
264 .await
265 .wrap_err("Failed to send aggregated nonces to verifiers")?;
266
267 queue_tx
268 .send((queue_item, pub_nonces))
269 .await
270 .wrap_err("Other end of channel closed")?;
271
272 tracing::trace!(
273 "Sent aggregated nonce {} to verifiers in nonce_distributor",
274 nonce_count
275 );
276 if nonce_count == needed_nofn_sigs {
277 break;
278 }
279 }
280 if nonce_count != needed_nofn_sigs {
281 let err_msg = format!("Expected {needed_nofn_sigs} aggregated nonces in nonce_distributor, got {nonce_count}",);
282 tracing::error!("{err_msg}");
283 return Err(eyre::eyre!(err_msg).into());
284 }
285
286 tracing::trace!(
287 tmp_debug = 1,
288 "Broadcasted {nonce_count} agg_nonces to verifiers and to the queue"
289 );
290 Ok::<(), BridgeError>(())
291 });
292
293 let handle_2 = tokio::spawn(async move {
294 while let Some((queue_item, pub_nonces)) = queue_rx.recv().await {
295 let pub_nonces_ref = pub_nonces.as_slice();
296 if pub_nonces_ref.len() != partial_sig_rx.len() {
297 return Err(eyre::eyre!(
298 "Number of public nonces {} and partial sig streams {} must be the same",
299 pub_nonces_ref.len(),
300 partial_sig_rx.len()
301 )
302 .into());
303 }
304 let partial_sigs = try_join_all_combine_errors(partial_sig_rx.iter_mut().zip(pub_nonces_ref.iter()).zip(verifiers_ids.iter()).map(
305 |((stream, pub_nonce), id)| async move {
306 let partial_sig = stream
307 .message()
308 .await
309 .wrap_err_with(|| AggregatorError::RequestFailed {
310 request_name: format!("Partial sig {sig_count} from {id}"),
311 })
312 .inspect_err(|e| {
313 tracing::error!(
314 "Failed to receive partial signature {sig_count} from {id}, an error was sent: {:?}",
315 e
316 );
317 })?
318 .ok_or_eyre(AggregatorError::InputStreamEndedEarlyUnknownSize {
319 stream_name: format!("Partial sig {sig_count} from {id} closed"),
320 }).inspect_err(|e| {
321 tracing::error!(
322 "Failed to receive partial signature {sig_count} from {id}, the stream was closed: {:?}",
323 e
324 );
325 })?;
326 let partial_sig = PartialSignature::from_byte_array(
327 &partial_sig
328 .partial_sig
329 .as_slice()
330 .try_into()
331 .wrap_err("PartialSignature must be 32 bytes")?,
332 )
333 .wrap_err(format!("Failed to parse partial signature {sig_count} from {id}"))?;
334
335 Ok::<_, BridgeError>((partial_sig, *pub_nonce))
336 },
337 ))
338 .await?;
339
340 sig_count += 1;
341
342 tracing::trace!(
343 "Received partial signature {} from verifiers in nonce_distributor",
344 sig_count
345 );
346
347 partial_sig_sender
348 .send((partial_sigs, queue_item))
349 .await
350 .map_err(|_| {
351 eyre::eyre!(AggregatorError::OutputStreamEndedEarly {
352 stream_name: "partial_sig_sender".into(),
353 })
354 })?;
355
356 tracing::trace!(
357 "Sent partial signature {} to signature_aggregator in nonce_distributor",
358 sig_count
359 );
360 }
361
362 if sig_count != needed_nofn_sigs {
363 let err_msg = format!(
364 "Expected {needed_nofn_sigs} partial signatures in nonce_distributor, got {sig_count}",
365 );
366 tracing::error!("{err_msg}");
367 return Err(eyre::eyre!(err_msg).into());
368 }
369 tracing::trace!(
370 tmp_debug = 1,
371 "Sent {sig_count} partial sig bundles to partial_sigs stream"
372 );
373
374 tracing::trace!("Finished tasks in nonce_distributor handle 2");
375 Ok::<(), BridgeError>(())
376 });
377
378 let (result_1, result_2) = tokio::join!(handle_1, handle_2);
379
380 let mut task_errors = Vec::new();
381
382 match result_1 {
383 Ok(inner_result) => {
384 if let Err(e) = inner_result {
385 task_errors.push(format!(
386 "Task returned error while distributing aggnonces: {e:#?}"
387 ));
388 }
389 }
390 Err(e) => {
391 task_errors.push(format!(
392 "Task panicked while distributing aggnonces: {e:#?}"
393 ));
394 }
395 }
396
397 match result_2 {
398 Ok(inner_result) => {
399 if let Err(e) = inner_result {
400 task_errors.push(format!(
401 "Task returned error while receiving partial sigs: {e:#?}"
402 ));
403 }
404 }
405 Err(e) => {
406 task_errors.push(format!(
407 "Task panicked while receiving partial sigs: {e:#?}"
408 ));
409 }
410 }
411
412 if !task_errors.is_empty() {
413 return Err(eyre::eyre!(format!(
414 "nonce_distributor failed with errors: {:#?}",
415 task_errors
416 ))
417 .into());
418 }
419
420 tracing::debug!("Finished tasks in nonce_distributor");
421
422 Ok(())
423}
424
425async fn signature_aggregator(
428 mut partial_sig_receiver: Receiver<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>,
429 verifiers_public_keys: Vec<PublicKey>,
430 final_sig_sender: Sender<FinalSigQueueItem>,
431 needed_nofn_sigs: usize,
432) -> Result<(), BridgeError> {
433 let mut sig_count = 0;
434 while let Some((partial_sigs, queue_item)) = partial_sig_receiver.recv().await {
435 sig_count += 1;
436 tracing::trace!(
437 "Received partial signatures {} in signature_aggregator",
438 sig_count
439 );
440
441 let final_sig = crate::musig2::aggregate_partial_signatures(
442 verifiers_public_keys.clone(),
443 None,
444 queue_item.agg_nonce,
445 &partial_sigs,
446 Message::from_digest(queue_item.sighash.to_byte_array()),
447 )?;
448
449 final_sig_sender
450 .send(FinalSigQueueItem {
451 final_sig: final_sig.serialize().to_vec(),
452 })
453 .await
454 .wrap_err_with(|| {
455 eyre::eyre!(AggregatorError::OutputStreamEndedEarly {
456 stream_name: "final_sig_sender".into(),
457 })
458 })?;
459 tracing::trace!(
460 "Sent aggregated signature {} to signature_distributor in signature_aggregator",
461 sig_count
462 );
463
464 if sig_count == needed_nofn_sigs {
465 break;
466 }
467 }
468
469 if sig_count != needed_nofn_sigs {
470 let err_msg = format!(
471 "Expected {needed_nofn_sigs} aggregated signatures in signature_aggregator, got {sig_count}",
472 );
473 tracing::error!("{err_msg}");
474 return Err(eyre::eyre!(err_msg).into());
475 }
476
477 tracing::trace!(
478 tmp_debug = 1,
479 "Sent {sig_count} aggregated signatures to final_sig stream"
480 );
481
482 Ok(())
483}
484
485async fn signature_distributor(
488 mut final_sig_receiver: Receiver<FinalSigQueueItem>,
489 deposit_finalize_sender: Vec<Sender<VerifierDepositFinalizeParams>>,
490 agg_nonce: impl Future<
491 Output = Result<
492 (
493 (AggregatedNonce, Vec<PublicNonce>),
494 (AggregatedNonce, Vec<PublicNonce>),
495 ),
496 Status,
497 >,
498 >,
499 needed_nofn_sigs: usize,
500 verifiers_ids: Vec<VerifierId>,
501) -> Result<(), BridgeError> {
502 use verifier_deposit_finalize_params::Params;
503 let mut sig_count = 0;
504 while let Some(queue_item) = final_sig_receiver.recv().await {
505 sig_count += 1;
506 tracing::trace!("Received signature {} in signature_distributor", sig_count);
507 let final_params = VerifierDepositFinalizeParams {
508 params: Some(Params::SchnorrSig(queue_item.final_sig)),
509 };
510
511 try_join_all_combine_errors(
512 deposit_finalize_sender
513 .iter()
514 .zip(verifiers_ids.iter())
515 .map(|(tx, id)| {
516 let final_params = final_params.clone();
517 async move {
518 tx.send(final_params).await.wrap_err_with(|| {
519 AggregatorError::OutputStreamEndedEarly {
520 stream_name: format!("Deposit finalize sender for {id}"),
521 }
522 })
523 }
524 }),
525 )
526 .await
527 .wrap_err(format!(
528 "Failed to send final signature {sig_count} to verifiers"
529 ))?;
530
531 tracing::trace!(
532 "Sent signature {} to verifiers in signature_distributor",
533 sig_count
534 );
535
536 if sig_count == needed_nofn_sigs {
537 break;
538 }
539 }
540
541 if sig_count != needed_nofn_sigs {
542 let err_msg = format!(
543 "Expected {needed_nofn_sigs} signatures in signature_distributor, got {sig_count}",
544 );
545 tracing::error!("{err_msg}");
546 return Err(eyre::eyre!(err_msg).into());
547 }
548
549 tracing::trace!(
550 tmp_debug = 1,
551 "Sent {sig_count} signatures to verifiers in deposit_finalize"
552 );
553
554 let (movetx_agg_nonce, emergency_stop_agg_nonce) = agg_nonce
555 .await
556 .wrap_err("Failed to get aggregated nonce for movetx and emergency stop")?;
557
558 tracing::info!("Got aggregated nonce for movetx and emergency stop in signature distributor");
559
560 for tx in &deposit_finalize_sender {
562 tx.send(VerifierDepositFinalizeParams {
563 params: Some(Params::MoveTxAggNonce(
564 movetx_agg_nonce.0.serialize().to_vec(),
565 )),
566 })
567 .await
568 .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
569 stream_name: "Deposit finalize sender (for movetx agg nonce)".to_string(),
570 })?;
571 }
572 tracing::info!("Sent movetx aggregated nonce to verifiers in signature distributor");
573
574 for tx in &deposit_finalize_sender {
576 tx.send(VerifierDepositFinalizeParams {
577 params: Some(Params::EmergencyStopAggNonce(
578 emergency_stop_agg_nonce.0.serialize().to_vec(),
579 )),
580 })
581 .await
582 .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
583 stream_name: "Deposit finalize sender (for emergency stop agg nonce)".to_string(),
584 })?;
585 }
586 tracing::info!("Sent emergency stop aggregated nonce to verifiers in signature distributor");
587
588 Ok(())
589}
590
591async fn create_nonce_streams(
599 verifiers: ParticipatingVerifiers,
600 num_nonces: u32,
601 #[cfg(test)] config: &crate::config::BridgeConfig,
602) -> Result<
603 (
604 Vec<clementine::NonceGenFirstResponse>,
605 Vec<BoxStream<'static, Result<PublicNonce, BridgeError>>>,
606 ),
607 BridgeError,
608> {
609 let mut nonce_streams = timed_try_join_all(
610 NONCE_STREAM_CREATION_TIMEOUT,
611 "Nonce stream creation",
612 Some(verifiers.ids()),
613 verifiers
614 .clients()
615 .into_iter()
616 .enumerate()
617 .map(|(idx, client)| {
618 let mut client = client.clone();
619 #[cfg(test)]
620 let config = config.clone();
621
622 async move {
623 #[cfg(test)]
624 config
625 .test_params
626 .timeout_params
627 .hook_timeout_nonce_stream_creation_verifier(idx)
628 .await;
629 let response_stream = client
630 .nonce_gen(tonic::Request::new(clementine::NonceGenRequest {
631 num_nonces,
632 }))
633 .await
634 .wrap_err_with(|| AggregatorError::RequestFailed {
635 request_name: format!("Nonce gen stream for verifier {idx}"),
636 })?;
637
638 Ok::<_, BridgeError>(response_stream.into_inner())
639 }
640 }),
641 )
642 .await?;
643
644 let first_responses: Vec<clementine::NonceGenFirstResponse> =
646 try_join_all_combine_errors(nonce_streams.iter_mut().zip(verifiers.ids()).map(
647 |(stream, id)| async move {
648 parser::verifier::parse_nonce_gen_first_response(stream)
649 .await
650 .wrap_err_with(|| format!("Failed to get initial response from {id}"))
651 },
652 ))
653 .await
654 .wrap_err("Failed to get nonce gen's initial responses from verifiers")?;
655
656 let transformed_streams = nonce_streams
657 .into_iter()
658 .zip(verifiers.ids())
659 .map(|(stream, id)| {
660 stream
661 .map(move |result| {
662 Aggregator::extract_pub_nonce(
663 result
664 .wrap_err_with(|| AggregatorError::InputStreamEndedEarlyUnknownSize {
665 stream_name: format!("Nonce gen stream for {id}"),
666 })?
667 .response,
668 )
669 })
670 .boxed()
671 })
672 .collect::<Vec<_>>();
673
674 Ok((first_responses, transformed_streams))
675}
676
677async fn collect_and_call<R, T, F, Fut>(
682 rx: &mut tokio::sync::broadcast::Receiver<Vec<T>>,
683 mut f: F,
684) -> Result<R, Status>
685where
686 R: Default,
687 T: Clone,
688 F: FnMut(Vec<T>) -> Fut,
689 Fut: Future<Output = Result<R, Status>>,
690{
691 loop {
692 match rx.recv().await {
693 Ok(params) => {
694 f(params).await?;
695 }
696 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
697 break Err(Status::internal(format!(
698 "lost {n} items due to lagging receiver"
699 )));
700 }
701 Err(tokio::sync::broadcast::error::RecvError::Closed) => break Ok(R::default()),
702 }
703 }
704}
705
706impl Aggregator {
707 fn extract_pub_nonce(
709 response: Option<clementine::nonce_gen_response::Response>,
710 ) -> Result<PublicNonce, BridgeError> {
711 match response.ok_or_eyre("NonceGen response is empty")? {
712 clementine::nonce_gen_response::Response::PubNonce(pub_nonce) => {
713 Ok(PublicNonce::from_byte_array(
714 &pub_nonce
715 .as_slice()
716 .try_into()
717 .wrap_err("PubNonce must be 66 bytes")?,
718 )
719 .wrap_err("Failed to parse pub nonce")?)
720 }
721 _ => Err(eyre::eyre!("Expected PubNonce in response").into()),
722 }
723 }
724
725 async fn collect_operator_sigs(
727 operator_clients: ParticipatingOperators,
728 config: BridgeConfig,
729 mut deposit_sign_session: DepositSignSession,
730 ) -> Result<Vec<Vec<Signature>>, BridgeError> {
731 deposit_sign_session.nonce_gen_first_responses = Vec::new(); let mut operator_sigs_streams =
733 timed_try_join_all(
735 OPERATOR_SIGS_STREAM_CREATION_TIMEOUT,
736 "Operator signature stream creation",
737 Some(operator_clients.ids()),
738 operator_clients.clients().into_iter().enumerate().map(|(idx, mut operator_client)| {
739 let sign_session = deposit_sign_session.clone();
740 #[cfg(test)]
741 let config = config.clone();
742 async move {
743 #[cfg(test)]
744 config
745 .test_params
746 .timeout_params
747 .hook_timeout_operator_sig_collection_operator(idx)
748 .await;
749 let stream = operator_client
750 .deposit_sign(tonic::Request::new(sign_session))
751 .await.wrap_err_with(|| AggregatorError::RequestFailed {
752 request_name: format!("Deposit sign stream for operator {idx}"),
753 })?;
754 Ok::<_, BridgeError>(stream.into_inner())
755 }
756 }))
757 .await?;
758
759 let deposit_data: DepositData = deposit_sign_session
760 .deposit_params
761 .clone()
762 .ok_or_else(|| eyre::eyre!("No deposit params found in deposit sign session"))?
763 .try_into()
764 .wrap_err("Failed to convert deposit params to deposit data")?;
765
766 let needed_sigs = config.get_num_required_operator_sigs(&deposit_data);
768
769 let operator_sigs =
771 try_join_all_combine_errors(operator_sigs_streams.iter_mut().enumerate().map(
772 |(idx, stream)| async move {
773 let mut sigs: Vec<Signature> = Vec::with_capacity(needed_sigs);
774 while let Some(sig) =
775 stream
776 .message()
777 .await
778 .wrap_err_with(|| AggregatorError::RequestFailed {
779 request_name: format!("Deposit sign stream for operator {idx}"),
780 })?
781 {
782 sigs.push(Signature::from_slice(&sig.schnorr_sig).wrap_err_with(|| {
783 format!("Failed to parse Schnorr signature from operator {idx}")
784 })?);
785 if sigs.len() == needed_sigs {
786 break;
787 }
788 }
789 Ok::<_, BridgeError>(sigs)
790 },
791 ))
792 .await
793 .wrap_err("Failed to get operator signatures from operators")?;
794
795 for (idx, sigs) in operator_sigs.iter().enumerate() {
797 if sigs.len() != needed_sigs {
798 return Err(eyre::eyre!(
799 "Not all operator sigs received from operator {}.\n Expected: {}, got: {}",
800 idx,
801 needed_sigs,
802 sigs.len()
803 )
804 .into());
805 }
806 }
807 Ok(operator_sigs)
808 }
809
810 async fn create_movetx(
811 &self,
812 partial_sigs: Vec<Vec<u8>>,
813 movetx_agg_and_pub_nonces: (AggregatedNonce, Vec<PublicNonce>),
814 deposit_params: DepositParams,
815 ) -> Result<TxHandler<Signed>, Status> {
816 let mut deposit_data: DepositData = deposit_params.try_into()?;
817 let musig_partial_sigs = parser::verifier::parse_partial_sigs(partial_sigs)?;
818
819 let mut move_txhandler =
821 create_move_to_vault_txhandler(&mut deposit_data, self.config.protocol_paramset())?;
822
823 let sighash = move_txhandler.calculate_script_spend_sighash_indexed(
824 0,
825 0,
826 bitcoin::TapSighashType::Default,
827 )?;
828
829 let musig_sigs_and_nonces = musig_partial_sigs
830 .into_iter()
831 .zip(movetx_agg_and_pub_nonces.1)
832 .collect::<Vec<_>>();
833
834 let verifiers_public_keys = deposit_data.get_verifiers();
836 let final_sig = crate::musig2::aggregate_partial_signatures(
837 verifiers_public_keys,
838 None,
839 movetx_agg_and_pub_nonces.0,
840 &musig_sigs_and_nonces,
841 Message::from_digest(sighash.to_byte_array()),
842 )?;
843
844 move_txhandler.set_p2tr_script_spend_witness(&[final_sig.as_ref()], 0, 0)?;
846
847 Ok(move_txhandler.promote()?)
848 }
849
850 async fn verify_and_save_emergency_stop_sigs(
851 &self,
852 emergency_stop_sigs: Vec<Vec<u8>>,
853 emergency_stop_agg_and_pub_nonces: (AggregatedNonce, Vec<PublicNonce>),
854 deposit_params: DepositParams,
855 ) -> Result<(), BridgeError> {
856 let mut deposit_data: DepositData = deposit_params
857 .try_into()
858 .wrap_err("Failed to convert deposit params to deposit data")?;
859 let musig_partial_sigs = parser::verifier::parse_partial_sigs(emergency_stop_sigs)
860 .wrap_err("Failed to parse emergency stop signatures")?;
861
862 let move_txhandler =
864 create_move_to_vault_txhandler(&mut deposit_data, self.config.protocol_paramset())?;
865
866 let mut emergency_stop_txhandler = create_emergency_stop_txhandler(
867 &mut deposit_data,
868 &move_txhandler,
869 self.config.protocol_paramset(),
870 )?;
871
872 let sighash = emergency_stop_txhandler.calculate_script_spend_sighash_indexed(
873 0,
874 0,
875 bitcoin::TapSighashType::SinglePlusAnyoneCanPay,
876 )?;
877
878 let verifiers_public_keys = deposit_data.get_verifiers();
879
880 let musig_sigs_and_nonces = musig_partial_sigs
881 .into_iter()
882 .zip(emergency_stop_agg_and_pub_nonces.1)
883 .collect::<Vec<_>>();
884
885 let final_sig = crate::musig2::aggregate_partial_signatures(
886 verifiers_public_keys,
887 None,
888 emergency_stop_agg_and_pub_nonces.0,
889 &musig_sigs_and_nonces,
890 Message::from_digest(sighash.to_byte_array()),
891 )
892 .wrap_err("Failed to aggregate emergency stop signatures")?;
893
894 let final_sig = bitcoin::taproot::Signature {
895 signature: final_sig,
896 sighash_type: bitcoin::TapSighashType::SinglePlusAnyoneCanPay,
897 };
898
899 emergency_stop_txhandler.set_p2tr_script_spend_witness(&[final_sig.serialize()], 0, 0)?;
901
902 let emergency_stop_tx = emergency_stop_txhandler.get_cached_tx();
903 let move_to_vault_txid = move_txhandler.get_txid();
904
905 tracing::debug!("Move to vault tx id: {}", move_to_vault_txid.to_string());
906
907 let emergency_stop_pubkey = self
908 .config
909 .emergency_stop_encryption_public_key
910 .ok_or_else(|| eyre::eyre!("Emergency stop encryption public key is not set"))?;
911 let encrypted_emergency_stop_tx = crate::encryption::encrypt_bytes(
912 emergency_stop_pubkey,
913 &bitcoin::consensus::serialize(&emergency_stop_tx),
914 )?;
915
916 self.db
917 .insert_signed_emergency_stop_tx_if_not_exists(
918 None,
919 move_to_vault_txid,
920 &encrypted_emergency_stop_tx,
921 )
922 .await?;
923
924 Ok(())
925 }
926
927 #[cfg(feature = "automation")]
928 pub async fn send_emergency_stop_tx(
929 &self,
930 tx: bitcoin::Transaction,
931 ) -> Result<bitcoin::Transaction, Status> {
932 let mut dbtx = self.db.begin_transaction().await?;
934 self.tx_sender
935 .insert_try_to_send(
936 &mut dbtx,
937 Some(TxMetadata {
938 deposit_outpoint: None,
939 operator_xonly_pk: None,
940 round_idx: None,
941 kickoff_idx: None,
942 tx_type: TransactionType::EmergencyStop,
943 }),
944 &tx,
945 FeePayingType::RBF,
946 None,
947 &[],
948 &[],
949 &[],
950 &[],
951 )
952 .await?;
953 dbtx.commit()
954 .await
955 .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
956
957 Ok(tx)
958 }
959}
960
961#[async_trait]
962impl ClementineAggregator for AggregatorServer {
963 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
964 async fn get_compatibility_params(
965 &self,
966 _request: Request<Empty>,
967 ) -> Result<Response<CompatibilityParamsRpc>, Status> {
968 let params = self.aggregator.get_compatibility_params()?;
969 Ok(Response::new(params.try_into().map_to_status()?))
970 }
971
972 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
973 async fn get_compatibility_data_from_entities(
974 &self,
975 _request: Request<Empty>,
976 ) -> Result<Response<EntitiesCompatibilityData>, Status> {
977 let data = self
978 .aggregator
979 .get_compatibility_data_from_entities()
980 .await?;
981 Ok(Response::new(EntitiesCompatibilityData {
982 entities_compatibility_data: data,
983 }))
984 }
985
986 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
987 async fn vergen(&self, _request: Request<Empty>) -> Result<Response<VergenResponse>, Status> {
988 tracing::info!("Vergen rpc called");
989 Ok(Response::new(get_vergen_response()))
990 }
991 #[tracing::instrument(
992 skip_all,
993 fields(restart_tasks = %request.get_ref().restart_tasks),
994 err(level = tracing::Level::ERROR)
995 )]
996 async fn get_entity_statuses(
997 &self,
998 request: Request<GetEntityStatusesRequest>,
999 ) -> Result<Response<EntityStatuses>, Status> {
1000 tracing::debug!("Get entity statuses rpc called");
1001 let request = request.into_inner();
1002 let restart_tasks = request.restart_tasks;
1003
1004 Ok(Response::new(EntityStatuses {
1005 entity_statuses: self.aggregator.get_entity_statuses(restart_tasks).await?,
1006 }))
1007 }
1008
1009 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
1010 async fn optimistic_payout(
1011 &self,
1012 request: tonic::Request<super::OptimisticWithdrawParams>,
1013 ) -> std::result::Result<tonic::Response<super::RawSignedTx>, tonic::Status> {
1014 tracing::info!("Optimistic payout rpc called");
1015 let opt_withdraw_params = request.into_inner();
1016
1017 let withdraw_params =
1018 opt_withdraw_params
1019 .withdrawal
1020 .clone()
1021 .ok_or(Status::invalid_argument(
1022 "Withdrawal params not found for optimistic payout",
1023 ))?;
1024 let (deposit_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
1025 parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
1026 tracing::info!("Parsed optimistic payout rpc params, deposit id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}, verification signature: {:?}", deposit_id, input_signature, input_outpoint, output_script_pubkey, output_amount, opt_withdraw_params.verification_signature);
1027
1028 self.check_compatibility_with_actors(CompatibilityCheckScope::VerifiersOnly)
1030 .await?;
1031
1032 if self
1034 .rpc
1035 .is_utxo_spent(&input_outpoint)
1036 .await
1037 .map_to_status()?
1038 {
1039 return Err(Status::invalid_argument(format!(
1040 "Withdrawal utxo is already spent: {input_outpoint:?}",
1041 )));
1042 }
1043
1044 if !(output_script_pubkey.is_p2tr()
1046 || output_script_pubkey.is_p2pkh()
1047 || output_script_pubkey.is_p2sh()
1048 || output_script_pubkey.is_p2wpkh()
1049 || output_script_pubkey.is_p2wsh())
1050 {
1051 return Err(Status::invalid_argument(format!(
1052 "Output script pubkey is not a valid script pubkey: {output_script_pubkey}, must be p2tr, p2pkh, p2sh, p2wpkh, or p2wsh"
1053 )));
1054 }
1055
1056 let withdrawal = self
1058 .db
1059 .get_move_to_vault_txid_from_citrea_deposit(None, deposit_id)
1060 .await?;
1061 if let Some(move_txid) = withdrawal {
1062 let withdrawal_utxo = self
1064 .db
1065 .get_withdrawal_utxo_from_citrea_withdrawal(None, deposit_id)
1066 .await?;
1067 if withdrawal_utxo != input_outpoint {
1068 return Err(Status::invalid_argument(format!(
1069 "Withdrawal utxo is not correct: {withdrawal_utxo:?} != {input_outpoint:?}",
1070 )));
1071 }
1072
1073 let withdrawal_prevout = self
1075 .rpc
1076 .get_txout_from_outpoint(&input_outpoint)
1077 .await
1078 .map_to_status()?;
1079
1080 let user_xonly_pk = withdrawal_prevout
1081 .script_pubkey
1082 .try_get_taproot_pk()
1083 .map_err(|_| {
1084 Status::invalid_argument(format!(
1085 "Withdrawal prevout script_pubkey is not a Taproot output: {:?}",
1086 withdrawal_prevout.script_pubkey
1087 ))
1088 })?;
1089
1090 let withdrawal_utxo = UTXO {
1091 outpoint: input_outpoint,
1092 txout: withdrawal_prevout,
1093 };
1094
1095 let output_txout = TxOut {
1096 value: output_amount,
1097 script_pubkey: output_script_pubkey,
1098 };
1099
1100 let deposit_data = self
1101 .db
1102 .get_deposit_data_with_move_tx(None, move_txid)
1103 .await?;
1104
1105 let mut deposit_data = deposit_data
1106 .ok_or(eyre::eyre!(
1107 "Deposit data not found for move txid {}",
1108 move_txid
1109 ))
1110 .map_err(BridgeError::from)?;
1111
1112 let mut opt_payout_txhandler = create_optimistic_payout_txhandler(
1113 &mut deposit_data,
1114 withdrawal_utxo,
1115 output_txout,
1116 input_signature,
1117 self.config.protocol_paramset(),
1118 )?;
1119
1120 let sighash = opt_payout_txhandler
1121 .calculate_pubkey_spend_sighash(0, input_signature.sighash_type)?;
1122
1123 let message = Message::from_digest(sighash.to_byte_array());
1124
1125 SECP.verify_schnorr(&input_signature.signature, &message, &user_xonly_pk)
1126 .map_err(|_| Status::internal("Invalid signature for optimistic payout tx. Ensure the signature uses SinglePlusAnyoneCanPay sighash type."))?;
1127
1128 let participating_verifiers = self.get_participating_verifiers(&deposit_data).await?;
1130 let verifiers_ids = participating_verifiers.ids();
1131 let (first_responses, mut nonce_streams) = {
1132 create_nonce_streams(
1133 participating_verifiers.clone(),
1134 1,
1135 #[cfg(test)]
1136 &self.config,
1137 )
1138 .await?
1139 };
1140 let pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
1142 .await
1143 .wrap_err("Failed to aggregate nonces for optimistic payout")
1144 .map_to_status()?;
1145 let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::<Vec<_>>().as_slice())?;
1146
1147 let agg_nonce_bytes = agg_nonce.serialize().to_vec();
1148 let opt_payout_sign_futures = participating_verifiers
1150 .clients()
1151 .iter()
1152 .zip(first_responses)
1153 .map(|(client, first_response)| {
1154 let mut client = client.clone();
1155 let opt_withdraw_params = opt_withdraw_params.clone();
1156 {
1157 let agg_nonce_serialized = agg_nonce_bytes.clone();
1158 async move {
1159 let mut request = Request::new(OptimisticPayoutParams {
1160 opt_withdrawal: Some(opt_withdraw_params),
1161 agg_nonce: agg_nonce_serialized,
1162 nonce_gen: Some(first_response),
1163 });
1164 request.set_timeout(OPTIMISTIC_PAYOUT_TIMEOUT);
1165 client.optimistic_payout_sign(request).await
1166 }
1167 }
1168 })
1169 .collect::<Vec<_>>();
1170
1171 let opt_payout_resps = join_all(opt_payout_sign_futures).await;
1173 let mut payout_sigs = Vec::new();
1174 let mut errors = Vec::new();
1175 for (resp, verifier_id) in opt_payout_resps
1176 .into_iter()
1177 .zip(participating_verifiers.ids())
1178 {
1179 match resp {
1180 Ok(res) => {
1181 payout_sigs.push(res.into_inner());
1182 }
1183 Err(e) => {
1184 errors.push(format!("{verifier_id} optimistic payout sign failed: {e}"));
1185 }
1186 }
1187 }
1188 if !errors.is_empty() {
1189 return Err(eyre::eyre!("{errors:?}").into_status());
1190 }
1191
1192 let sighash = opt_payout_txhandler.calculate_script_spend_sighash_indexed(
1195 1,
1196 0,
1197 bitcoin::TapSighashType::Default,
1198 )?;
1199
1200 let musig_partial_sigs = payout_sigs
1201 .into_iter()
1202 .map(|sig| {
1203 PartialSignature::from_byte_array(
1204 &sig.partial_sig
1205 .try_into()
1206 .map_err(|_| secp256k1::musig::ParseError::MalformedArg)?,
1207 )
1208 })
1209 .collect::<Result<Vec<_>, _>>()
1210 .map_err(|e| Status::internal(format!("Failed to parse partial sig: {e:?}")))?;
1211
1212 let musig_sigs_and_nonces = musig_partial_sigs
1213 .into_iter()
1214 .zip(pub_nonces)
1215 .collect::<Vec<_>>();
1216
1217 let final_sig = bitcoin::taproot::Signature {
1218 signature: crate::musig2::aggregate_partial_signatures(
1219 deposit_data.get_verifiers(),
1220 None,
1221 agg_nonce,
1222 &musig_sigs_and_nonces,
1223 Message::from_digest(sighash.to_byte_array()),
1224 )?,
1225 sighash_type: bitcoin::TapSighashType::Default,
1226 };
1227
1228 opt_payout_txhandler.set_p2tr_script_spend_witness(&[final_sig.serialize()], 1, 0)?;
1230 let opt_payout_txhandler = opt_payout_txhandler.promote()?;
1231 let opt_payout_tx = opt_payout_txhandler.get_cached_tx();
1232 tracing::info!(
1233 "Optimistic payout transaction created successfully for deposit id: {:?}",
1234 deposit_id
1235 );
1236
1237 #[cfg(feature = "automation")]
1238 {
1239 tracing::info!("Sending optimistic payout tx via tx_sender");
1240
1241 let mut dbtx = self.db.begin_transaction().await?;
1242 self.tx_sender
1243 .add_tx_to_queue(
1244 &mut dbtx,
1245 TransactionType::OptimisticPayout,
1246 opt_payout_tx,
1247 &[],
1248 None,
1249 self.config.protocol_paramset(),
1250 None,
1251 )
1252 .await
1253 .map_to_status()?;
1254 dbtx.commit().await.map_err(|e| {
1255 Status::internal(format!(
1256 "Failed to commit db transaction to send optimistic payout tx: {e}",
1257 ))
1258 })?;
1259 }
1260
1261 Ok(Response::new(RawSignedTx::from(opt_payout_tx)))
1262 } else {
1263 Err(Status::not_found(format!(
1264 "Withdrawal with index {deposit_id} not found."
1265 )))
1266 }
1267 }
1268
1269 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
1270 async fn internal_send_tx(
1271 &self,
1272 request: Request<clementine::SendTxRequest>,
1273 ) -> Result<Response<Empty>, Status> {
1274 #[cfg(not(feature = "automation"))]
1275 {
1276 Err(Status::unimplemented("Automation is not enabled"))
1277 }
1278 #[cfg(feature = "automation")]
1279 {
1280 let send_tx_req = request.into_inner();
1281 let fee_type = send_tx_req.fee_type();
1282 let signed_tx: bitcoin::Transaction = send_tx_req
1283 .raw_tx
1284 .ok_or(Status::invalid_argument("Missing raw_tx"))?
1285 .try_into()?;
1286 tracing::warn!(
1287 "Internal send tx rpc called with feetype: {:?}, tx hex: {}",
1288 fee_type,
1289 bitcoin::consensus::encode::serialize_hex(&signed_tx)
1290 );
1291
1292 let mut dbtx = self.db.begin_transaction().await?;
1293 self.tx_sender
1294 .insert_try_to_send(
1295 &mut dbtx,
1296 None,
1297 &signed_tx,
1298 fee_type.try_into()?,
1299 None,
1300 &[],
1301 &[],
1302 &[],
1303 &[],
1304 )
1305 .await
1306 .map_to_status()?;
1307 dbtx.commit()
1308 .await
1309 .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
1310 Ok(Response::new(Empty {}))
1311 }
1312 }
1313
1314 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
1315 async fn setup(
1316 &self,
1317 _request: Request<Empty>,
1318 ) -> Result<Response<VerifierPublicKeys>, Status> {
1319 tracing::info!("Setup rpc called");
1320 self.check_compatibility_with_actors(CompatibilityCheckScope::Both)
1321 .await?;
1322 const CHANNEL_CAPACITY: usize = 1024 * 16;
1324 let (operator_params_tx, operator_params_rx) =
1325 tokio::sync::broadcast::channel(CHANNEL_CAPACITY);
1326 let operator_params_rx_handles = (0..self.get_verifier_clients().len())
1327 .map(|_| operator_params_rx.resubscribe())
1328 .collect::<Vec<_>>();
1329
1330 let operators = self.get_operator_clients().to_vec();
1331 let operator_pks = self.fetch_operator_keys().await?;
1332 let operator_ids = operator_pks
1333 .iter()
1334 .map(|key| OperatorId(*key))
1335 .collect::<Vec<_>>();
1336 let get_operator_params_chunked_handle = tokio::spawn(async move {
1337 tracing::info!(clients = operators.len(), "Collecting operator details...");
1338 try_join_all_combine_errors(operators.iter().zip(operator_ids.iter()).map(
1339 |(operator, id)| {
1340 let mut operator = operator.clone();
1341 let tx = operator_params_tx.clone();
1342 async move {
1343 let stream = operator
1344 .get_params(Request::new(Empty {}))
1345 .await
1346 .wrap_err_with(|| AggregatorError::RequestFailed {
1347 request_name: format!("Operator get params for {id}"),
1348 })
1349 .map_err(BridgeError::from)?
1350 .into_inner();
1351 tx.send(stream.try_collect::<Vec<_>>().await?)
1352 .map_err(|e| {
1353 BridgeError::from(eyre::eyre!(
1354 "Failed to read operator params for {id}: {e}"
1355 ))
1356 })?;
1357 Ok::<_, Status>(())
1358 }
1359 },
1360 ))
1361 .await
1362 .wrap_err("Failed to get operator params from operators")
1363 .map_to_status()?;
1364 Ok::<_, Status>(())
1365 });
1366
1367 let verifiers = self.get_verifier_clients().to_vec();
1368 let verifier_pks = self.fetch_verifier_keys().await?;
1369 let verifier_ids = verifier_pks
1370 .iter()
1371 .map(|key| VerifierId(*key))
1372 .collect::<Vec<_>>();
1373 let set_operator_params_handle = tokio::spawn(async move {
1374 tracing::info!("Informing verifiers of existing operators...");
1375 try_join_all_combine_errors(
1376 verifiers
1377 .iter()
1378 .zip(verifier_ids.iter())
1379 .zip(operator_params_rx_handles)
1380 .map(|((verifier, id), mut rx)| {
1381 let verifier = verifier.clone();
1382 async move {
1383 collect_and_call(&mut rx, |params| {
1384 let mut verifier = verifier.clone();
1385 async move {
1386 verifier
1387 .set_operator(futures::stream::iter(params))
1388 .await
1389 .wrap_err_with(|| AggregatorError::RequestFailed {
1390 request_name: format!("Verifier set_operator for {id}"),
1391 })
1392 .map_err(BridgeError::from)?;
1393 Ok::<_, Status>(())
1394 }
1395 })
1396 .await?;
1397 Ok::<_, Status>(())
1398 }
1399 }),
1400 )
1401 .await
1402 .wrap_err("Failed to set_operator for all verifiers")
1403 .map_to_status()?;
1404 Ok::<_, Status>(())
1405 });
1406
1407 let task_outputs = timed_request(
1408 SETUP_COMPLETION_TIMEOUT,
1409 "Aggregator setup pipeline",
1410 async move {
1411 Ok::<_, BridgeError>(
1412 futures::future::join_all([
1413 get_operator_params_chunked_handle,
1414 set_operator_params_handle,
1415 ])
1416 .await,
1417 )
1418 },
1419 )
1420 .await?;
1421
1422 let task_names = ["Get operator params", "Set operator params"];
1423 debug_assert_eq!(task_names.len(), task_outputs.len());
1424
1425 flatten_join_named_results(task_names.into_iter().zip(task_outputs.into_iter()))?;
1426
1427 Ok(Response::new(VerifierPublicKeys::from(verifier_pks)))
1428 }
1429
1430 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
1449 async fn new_deposit(
1450 &self,
1451 request: Request<Deposit>,
1452 ) -> Result<Response<clementine::RawSignedTx>, Status> {
1453 tracing::info!("New deposit rpc called");
1454 self.check_compatibility_with_actors(CompatibilityCheckScope::Both)
1455 .await?;
1456
1457 timed_request(OVERALL_DEPOSIT_TIMEOUT, "Overall new deposit", async {
1458 let deposit_info: DepositInfo = request.into_inner().try_into()?;
1459 tracing::info!(
1460 "Parsed new deposit rpc params, deposit info: {:?}",
1461 deposit_info
1462 );
1463
1464 let deposit_data = DepositData {
1465 deposit: deposit_info.clone(),
1466 nofn_xonly_pk: None,
1467 actors: Actors {
1468 verifiers: self.fetch_verifier_keys().await?,
1469 watchtowers: vec![],
1470 operators: self.fetch_operator_keys().await?,
1471 },
1472 security_council: self.config.security_council.clone(),
1473 };
1474 tracing::info!(
1475 "Created deposit data in new_deposit for deposit info: {:?}, deposit data: {:?}",
1476 deposit_info,
1477 deposit_data
1478 );
1479
1480 let deposit_params = deposit_data.clone().into();
1481
1482 let start = std::time::Instant::now();
1484 timed_request(
1485 KEY_DISTRIBUTION_TIMEOUT,
1486 "Key collection and distribution",
1487 self.collect_and_distribute_keys(&deposit_params),
1488 )
1489 .await?;
1490 tracing::info!("Collected and distributed keys in {:?}", start.elapsed());
1491
1492 let verifiers = self.get_participating_verifiers(&deposit_data).await?;
1493 let verifiers_ids = verifiers.ids();
1494
1495 let num_required_sigs = self.config.get_num_required_nofn_sigs(&deposit_data);
1497 let num_required_nonces = num_required_sigs as u32 + 2; let (first_responses, nonce_streams) =
1499 create_nonce_streams(
1500 verifiers.clone(),
1501 num_required_nonces,
1502 #[cfg(test)]
1503 &self.config,
1504 )
1505 .await?;
1506
1507 let deposit_sign_session = DepositSignSession {
1509 deposit_params: Some(deposit_params.clone()),
1510 nonce_gen_first_responses: first_responses,
1511 };
1512
1513 let deposit_sign_param: VerifierDepositSignParams =
1514 deposit_sign_session.clone().into();
1515
1516 #[allow(clippy::unused_enumerate_index)]
1517 let partial_sig_streams = timed_try_join_all(
1518 PARTIAL_SIG_STREAM_CREATION_TIMEOUT,
1519 "Partial signature stream creation",
1520 Some(verifiers.ids()),
1521 verifiers.clients().into_iter().enumerate().map(|(_idx, verifier_client)| {
1522 let mut verifier_client = verifier_client.clone();
1523 #[cfg(test)]
1524 let config = self.config.clone();
1525
1526 let deposit_sign_param =
1527 deposit_sign_param.clone();
1528
1529 async move {
1530 #[cfg(test)]
1531 config
1532 .test_params
1533 .timeout_params
1534 .hook_timeout_partial_sig_stream_creation_verifier(_idx)
1535 .await;
1536
1537 let (tx, rx) = tokio::sync::mpsc::channel(num_required_nonces as usize + 1); let stream = verifier_client
1540 .deposit_sign(tokio_stream::wrappers::ReceiverStream::new(rx))
1541 .await?
1542 .into_inner();
1543
1544 tx.send(deposit_sign_param).await.map_err(|e| {
1545 BridgeError::from(eyre::eyre!("Failed to send deposit sign session: {e:?}"))})?;
1546
1547 Ok::<_, BridgeError>((stream, tx))
1548 }
1549 })
1550 )
1551 .await?;
1552
1553 #[allow(clippy::unused_enumerate_index)]
1555 let deposit_finalize_streams = verifiers.clients().into_iter().enumerate().map(
1556 |(_idx, mut verifier)| {
1557 let (tx, rx) = tokio::sync::mpsc::channel(num_required_nonces as usize + 1);
1558 let receiver_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1559 #[cfg(test)]
1560 let config = self.config.clone();
1561 let deposit_finalize_future = tokio::spawn(async move {
1563 #[cfg(test)]
1564 config
1565 .test_params
1566 .timeout_params
1567 .hook_timeout_deposit_finalize_verifier(_idx)
1568 .await;
1569
1570 verifier.deposit_finalize(receiver_stream).await
1571 });
1572
1573 Ok::<_, BridgeError>((deposit_finalize_future, tx))
1574 },
1575 ).collect::<Result<Vec<_>, BridgeError>>()?;
1576
1577 tracing::info!("Sending deposit finalize streams to verifiers for deposit {:?}", deposit_info);
1578
1579 let (deposit_finalize_futures, deposit_finalize_sender): (Vec<_>, Vec<_>) =
1580 deposit_finalize_streams.into_iter().unzip();
1581
1582 let deposit_finalize_first_param: VerifierDepositFinalizeParams =
1584 deposit_sign_session.clone().into();
1585
1586 timed_try_join_all(
1587 DEPOSIT_FINALIZE_STREAM_CREATION_TIMEOUT,
1588 "Deposit finalization initial param send",
1589 Some(verifiers.ids()),
1590 deposit_finalize_sender.iter().cloned().map(|tx| {
1591 let param = deposit_finalize_first_param.clone();
1592 async move {
1593 tx.send(param).await
1594 .map_err(|e| {
1595 BridgeError::from(eyre::eyre!(
1596 "Failed to send deposit finalize first param: {e:?}"))
1597 })
1598 }
1599 })
1600 ).await?;
1601
1602
1603 let deposit_blockhash = self
1604 .rpc
1605 .get_blockhash_of_tx(&deposit_data.get_deposit_outpoint().txid)
1606 .await
1607 .map_to_status()?;
1608
1609 let verifiers_public_keys = deposit_data.get_verifiers();
1610
1611 let needed_nofn_sigs = self.config.get_num_required_nofn_sigs(&deposit_data);
1612
1613 let sighash_stream = Box::pin(create_nofn_sighash_stream(
1615 self.db.clone(),
1616 self.config.clone(),
1617 deposit_data.clone(),
1618 deposit_blockhash,
1619 false,
1620 ));
1621
1622 let (agg_nonce_sender, agg_nonce_receiver) = channel(num_required_nonces as usize);
1624 let (partial_sig_sender, partial_sig_receiver) = channel(num_required_nonces as usize);
1625 let (final_sig_sender, final_sig_receiver) = channel(num_required_nonces as usize);
1626
1627 let nonce_agg_handle = tokio::spawn(nonce_aggregator(
1629 nonce_streams,
1630 sighash_stream,
1631 agg_nonce_sender,
1632 needed_nofn_sigs,
1633 verifiers_ids.clone(),
1634 ));
1635
1636 let nonce_dist_handle = tokio::spawn(nonce_distributor(
1638 agg_nonce_receiver,
1639 partial_sig_streams,
1640 partial_sig_sender,
1641 needed_nofn_sigs,
1642 verifiers_ids.clone(),
1643 ));
1644
1645 let sig_agg_handle = tokio::spawn(signature_aggregator(
1647 partial_sig_receiver,
1648 verifiers_public_keys,
1649 final_sig_sender,
1650 needed_nofn_sigs,
1651 ));
1652
1653 tracing::debug!("Getting signatures from operators");
1654 let operators = self.get_participating_operators(&deposit_data).await?;
1656
1657 let config_clone = self.config.clone();
1658 let operator_sigs_fut = tokio::spawn(async move {
1659 timed_request(
1660 OPERATOR_SIGS_TIMEOUT,
1661 "Operator signature collection",
1662 async {
1663 Aggregator::collect_operator_sigs(
1664 operators,
1665 config_clone,
1666 deposit_sign_session,
1667 )
1668 .await
1669 },
1670 )
1671 .await
1672 });
1673
1674 let nonce_agg_handle = nonce_agg_handle
1676 .map_err(|_| Status::internal("panic when aggregating nonces"))
1677 .map(
1678 |res| -> Result<((AggregatedNonce, Vec<PublicNonce>), (AggregatedNonce, Vec<PublicNonce>)), Status> {
1679 res.and_then(|r| r.map_err(Into::into))
1680 },
1681 )
1682 .shared();
1683
1684 let sig_dist_handle = tokio::spawn(signature_distributor(
1686 final_sig_receiver,
1687 deposit_finalize_sender.clone(),
1688 nonce_agg_handle.clone(),
1689 needed_nofn_sigs,
1690 verifiers_ids.clone(),
1691 ));
1692
1693 let all_op_sigs = operator_sigs_fut
1697 .await
1698 .map_err(|_| BridgeError::from(eyre::eyre!("panic when collecting operator signatures")))??;
1699
1700 tracing::info!("Got all operator signatures for deposit {:?}", deposit_info);
1701
1702 let task_outputs = timed_request(
1708 PIPELINE_COMPLETION_TIMEOUT,
1709 "MuSig2 signing pipeline",
1710 async move {
1711 Ok::<_, BridgeError>(futures::future::join_all([nonce_dist_handle, sig_agg_handle, sig_dist_handle]).await)
1712 },
1713 )
1714 .await?;
1715
1716 let task_names = ["Nonce distribution", "Signature aggregation", "Signature distribution"];
1717
1718 debug_assert_eq!(task_names.len(), task_outputs.len());
1719
1720 flatten_join_named_results(
1721 task_names.into_iter().zip(task_outputs.into_iter()),
1722 )?;
1723 tracing::info!("All deposit_sign related tasks completed for deposit {:?}, now sending operator signatures to verifiers for verification", deposit_info);
1724
1725 tracing::debug!("Pipeline tasks completed");
1726 let verifiers_ids = verifiers.ids();
1727
1728 let deposit_finalize_futures = timed_request(
1730 SEND_OPERATOR_SIGS_TIMEOUT,
1731 "Sending operator signatures to verifiers",
1732 async {
1733 let send_operator_sigs: Vec<_> = deposit_finalize_sender
1734 .iter()
1735 .zip(verifiers_ids.iter())
1736 .zip(deposit_finalize_futures.into_iter())
1737 .map(|((tx, id), dep_fin_fut)| async {
1738 for one_op_sigs in all_op_sigs.iter() {
1739 for sig in one_op_sigs.iter() {
1740 let deposit_finalize_param: VerifierDepositFinalizeParams =
1741 sig.into();
1742
1743 let send = tx.send(deposit_finalize_param).await;
1744 match send {
1745 Ok(()) => (),
1746 Err(e) => {
1747 dep_fin_fut.await.wrap_err(format!("{} deposit finalize tokio task on aggregator returned error", id.clone()))?.wrap_err(format!("{} deposit finalize rpc call returned error", id.clone()))?;
1749 return Err(BridgeError::from(eyre::eyre!(format!("{} deposit finalize stream sending returned error: {:?}", id.clone(), e))));
1750 }
1751 }
1752 }
1753 }
1754
1755 Ok::<_, BridgeError>(dep_fin_fut)
1756 })
1757 .collect();
1758 try_join_all_combine_errors(send_operator_sigs).await
1759 },
1760 )
1761 .await.wrap_err("Failed to send operator signatures to verifiers")?;
1762
1763 tracing::info!("All operator signatures sent to verifiers for verification, now waiting to collect movetx and emergency stop tx partial signatures from verifiers for deposit {:?}", deposit_info);
1764
1765 let partial_sigs: Vec<(Vec<u8>, Vec<u8>)> = timed_try_join_all(
1767 DEPOSIT_FINALIZATION_TIMEOUT,
1768 "Deposit finalization",
1769 Some(verifiers.ids()),
1770 deposit_finalize_futures.into_iter().map(|fut| async move {
1771 let inner = fut.await
1772 .map_err(|_| BridgeError::from(eyre::eyre!("panic finishing deposit_finalize")))??
1773 .into_inner();
1774 Ok((inner.move_to_vault_partial_sig, inner.emergency_stop_partial_sig))
1775 }),
1776 )
1777 .await?;
1778
1779
1780 let (move_to_vault_sigs, emergency_stop_sigs): (Vec<Vec<u8>>, Vec<Vec<u8>>) =
1781 partial_sigs.into_iter().unzip();
1782
1783 tracing::info!("Received move tx and emergency stop tx partial signatures for deposit {:?}", deposit_info);
1784
1785 let (movetx_agg_nonce, emergency_stop_agg_nonce) = nonce_agg_handle.await?;
1787
1788 self.verify_and_save_emergency_stop_sigs(
1790 emergency_stop_sigs,
1791 emergency_stop_agg_nonce,
1792 deposit_params.clone(),
1793 )
1794 .await?;
1795
1796 let signed_movetx_handler = self
1797 .create_movetx(move_to_vault_sigs, movetx_agg_nonce, deposit_params)
1798 .await?;
1799
1800 let raw_signed_tx = RawSignedTx {
1801 raw_tx: bitcoin::consensus::serialize(&signed_movetx_handler.get_cached_tx()),
1802 };
1803
1804 tracing::info!("Created final move transaction for deposit {:?}", deposit_info);
1805
1806 Ok(Response::new(raw_signed_tx))
1807 })
1808 .await.map_err(Into::into)
1809 }
1810
1811 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
1812 async fn withdraw(
1813 &self,
1814 request: Request<AggregatorWithdrawalInput>,
1815 ) -> Result<Response<AggregatorWithdrawResponse>, Status> {
1816 tracing::warn!("Withdraw rpc called");
1817 let request = request.into_inner();
1818 let (withdraw_params_with_sig, operator_xonly_pks) = (
1819 request.withdrawal.ok_or(Status::invalid_argument(
1820 "withdrawalParamsWithSig is missing",
1821 ))?,
1822 request.operator_xonly_pks,
1823 );
1824 self.check_compatibility_with_actors(CompatibilityCheckScope::OperatorsOnly)
1826 .await?;
1827
1828 let withdraw_params = withdraw_params_with_sig
1829 .clone()
1830 .withdrawal
1831 .ok_or(Status::invalid_argument("withdrawalParams is missing"))?;
1832
1833 let operator_xonly_pks_from_rpc: Vec<XOnlyPublicKey> = operator_xonly_pks
1835 .into_iter()
1836 .map(|xonly_pk| {
1837 xonly_pk.try_into().map_err(|e| {
1838 Status::invalid_argument(format!("Failed to convert xonly public key: {e}"))
1839 })
1840 })
1841 .collect::<Result<Vec<_>, Status>>()?;
1842
1843 tracing::info!(
1844 "Parsed withdraw rpc params, withdrawal params: {:?}, operator xonly pks: {:?}",
1845 withdraw_params,
1846 operator_xonly_pks_from_rpc
1847 .iter()
1848 .map(|pk| pk.to_string())
1849 .collect::<Vec<_>>()
1850 );
1851
1852 let (withdrawal_id, _, _, _, _) =
1855 parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
1856
1857 let current_operator_xonly_pks = self.fetch_operator_keys().await?;
1860 let invalid_operator_xonly_pks = operator_xonly_pks_from_rpc
1861 .iter()
1862 .filter(|xonly_pk| !current_operator_xonly_pks.contains(xonly_pk))
1863 .collect::<Vec<_>>();
1864 if !invalid_operator_xonly_pks.is_empty() {
1865 return Err(Status::invalid_argument(format!(
1866 "Given xonly public key doesn't belong to any current operator: invalid keys: {invalid_operator_xonly_pks:?}, current operators: {current_operator_xonly_pks:?}"
1867 )));
1868 }
1869
1870 let operators = self
1871 .get_operator_clients()
1872 .iter()
1873 .zip(current_operator_xonly_pks.into_iter());
1874 let withdraw_futures = operators
1875 .filter(|(_, xonly_pk)| {
1876 operator_xonly_pks_from_rpc.is_empty()
1878 || operator_xonly_pks_from_rpc.contains(xonly_pk)
1879 })
1880 .map(|(operator, operator_xonly_pk)| {
1881 let mut operator = operator.clone();
1882 let params = withdraw_params_with_sig.clone();
1883 let mut request = Request::new(params);
1884 request.set_timeout(WITHDRAWAL_TIMEOUT);
1885 async move { (operator.withdraw(request).await, operator_xonly_pk) }
1886 });
1887
1888 let responses = futures::future::join_all(withdraw_futures).await;
1890 tracing::info!(
1891 "Withdraw rpc completed successfully for withdrawal id: {}, operator xonly pks: {:?}, responses: {:?}",
1892 withdrawal_id,
1893 operator_xonly_pks_from_rpc
1894 .iter()
1895 .map(|pk| pk.to_string())
1896 .collect::<Vec<_>>(),
1897 responses,
1898 );
1899 Ok(Response::new(AggregatorWithdrawResponse {
1900 withdraw_responses: responses
1901 .into_iter()
1902 .map(|(res, xonly_pk)| match res {
1903 Ok(withdraw_response) => OperatorWithrawalResponse {
1904 operator_xonly_pk: Some(xonly_pk.into()),
1905 response: Some(operator_withrawal_response::Response::RawTx(
1906 withdraw_response.into_inner(),
1907 )),
1908 },
1909 Err(e) => OperatorWithrawalResponse {
1910 operator_xonly_pk: Some(xonly_pk.into()),
1911 response: Some(operator_withrawal_response::Response::Error(e.to_string())),
1912 },
1913 })
1914 .collect(),
1915 }))
1916 }
1917
1918 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR))]
1919 async fn get_nofn_aggregated_xonly_pk(
1920 &self,
1921 _request: tonic::Request<super::Empty>,
1922 ) -> std::result::Result<tonic::Response<super::NofnResponse>, tonic::Status> {
1923 tracing::info!("Get nofn aggregated xonly pk rpc called");
1924 let verifier_keys = self.fetch_verifier_keys().await?;
1925 let num_verifiers = verifier_keys.len();
1926 let nofn_xonly_pk = bitcoin::XOnlyPublicKey::from_musig2_pks(verifier_keys.clone(), None)
1927 .map_err(|e| {
1928 Status::internal(format!(
1929 "Failed to aggregate verifier public keys, err: {e}, pubkeys: {verifier_keys:?}"
1930 ))
1931 })?;
1932 Ok(Response::new(super::NofnResponse {
1933 nofn_xonly_pk: nofn_xonly_pk.serialize().to_vec(),
1934 num_verifiers: num_verifiers as u32,
1935 }))
1936 }
1937
1938 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
1939 async fn internal_get_emergency_stop_tx(
1940 &self,
1941 request: Request<clementine::GetEmergencyStopTxRequest>,
1942 ) -> Result<Response<clementine::GetEmergencyStopTxResponse>, Status> {
1943 tracing::warn!("Get emergency stop tx rpc called");
1944 let inner_request = request.into_inner();
1945 let txids: Vec<Txid> = inner_request
1946 .txids
1947 .into_iter()
1948 .map(|txid| {
1949 Txid::from_slice(&txid.txid).map_err(|e| {
1950 tonic::Status::invalid_argument(format!("Failed to parse txid: {e}"))
1951 })
1952 })
1953 .collect::<Result<Vec<_>, _>>()?;
1954 tracing::warn!(
1955 "Parsed get emergency stop tx rpc params, move txids: {:?}",
1956 txids
1957 .iter()
1958 .map(|txid| txid.to_string())
1959 .collect::<Vec<_>>()
1960 );
1961
1962 let emergency_stop_txs = self.db.get_emergency_stop_txs(None, txids).await?;
1963
1964 let (txids, encrypted_emergency_stop_txs): (Vec<Txid>, Vec<Vec<u8>>) =
1965 emergency_stop_txs.into_iter().unzip();
1966
1967 Ok(Response::new(clementine::GetEmergencyStopTxResponse {
1968 txids: txids.into_iter().map(|txid| txid.into()).collect(),
1969 encrypted_emergency_stop_txs,
1970 }))
1971 }
1972
1973 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR))]
1974 async fn send_move_to_vault_tx(
1975 &self,
1976 request: Request<clementine::SendMoveTxRequest>,
1977 ) -> Result<Response<clementine::Txid>, Status> {
1978 tracing::info!("Send move to vault tx rpc called");
1979 #[cfg(not(feature = "automation"))]
1980 {
1981 let _ = request;
1982 return Err(Status::unimplemented(
1983 "Automation is disabled, cannot automatically send move to vault tx.",
1984 ));
1985 }
1986
1987 #[cfg(feature = "automation")]
1988 {
1989 use bitcoin::Amount;
1990 use std::sync::Arc;
1991
1992 use crate::builder::{
1993 address::create_taproot_address,
1994 script::{CheckSig, Multisig, SpendableScript},
1995 transaction::anchor_output,
1996 };
1997
1998 let request = request.into_inner();
1999 let movetx: bitcoin::Transaction = bitcoin::consensus::deserialize(
2000 &request
2001 .raw_tx
2002 .ok_or_eyre("raw_tx is required")
2003 .map_to_status()?
2004 .raw_tx,
2005 )
2006 .wrap_err("Failed to deserialize movetx")
2007 .map_to_status()?;
2008 let deposit_outpoint: bitcoin::OutPoint = request
2009 .deposit_outpoint
2010 .ok_or(Status::invalid_argument("deposit_outpoint is required"))?
2011 .try_into()?;
2012
2013 tracing::info!(
2014 "Parsed send move to vault tx rpc params, deposit outpoint: {:?}, movetx hex: {}",
2015 deposit_outpoint,
2016 bitcoin::consensus::encode::serialize_hex(&movetx)
2017 );
2018
2019 if movetx.input.len() != 1 || movetx.output.len() != 2 {
2021 return Err(Status::invalid_argument(
2022 "Transaction is not a movetx, input or output lengths are not correct",
2023 ));
2024 }
2025 if !(movetx.output[0].value == self.config.protocol_paramset().bridge_amount
2028 && movetx.output[1].value == Amount::from_sat(0))
2029 {
2030 return Err(Status::invalid_argument(format!(
2031 "Transaction is not a movetx, output sat values are not correct, should be ({}, 0), got ({}, {})",
2032 self.config.protocol_paramset().bridge_amount,
2033 movetx.output[0].value,
2034 movetx.output[1].value,
2035 )));
2036 }
2037 let verifier_keys = self.fetch_verifier_keys().await?;
2039 let nofn_xonly_pk =
2040 bitcoin::XOnlyPublicKey::from_musig2_pks(verifier_keys.clone(), None).map_err(
2041 |e| {
2042 Status::internal(format!(
2043 "Failed to aggregate verifier public keys, err: {e}, pubkeys: {verifier_keys:?}"
2044 ))
2045 },
2046 )?;
2047 let nofn_script = Arc::new(CheckSig::new(nofn_xonly_pk));
2048 let security_council_script = Arc::new(Multisig::from_security_council(
2049 self.config.security_council.clone(),
2050 ));
2051
2052 let (addr, _) = create_taproot_address(
2053 &[
2054 nofn_script.to_script_buf(),
2055 security_council_script.to_script_buf(),
2056 ],
2057 None,
2058 self.config.protocol_paramset().network,
2059 );
2060 let bridge_script_pubkey = addr.script_pubkey();
2061
2062 if !(movetx.output[1].script_pubkey
2063 == anchor_output(self.config.protocol_paramset().anchor_amount()).script_pubkey
2064 && movetx.output[0].script_pubkey == bridge_script_pubkey)
2065 {
2066 return Err(Status::invalid_argument(
2067 format!("Transaction is not a movetx, output scriptpubkeys are not correct, expected: (vault: {:?}, anchor: {:?}), got: (vault: {:?}, anchor: {:?})",
2068 bridge_script_pubkey,
2069 anchor_output(self.config.protocol_paramset().anchor_amount()).script_pubkey,
2070 movetx.output[0].script_pubkey,
2071 movetx.output[1].script_pubkey,
2072 )));
2073 }
2074
2075 let mut dbtx = self.db.begin_transaction().await?;
2076 self.tx_sender
2077 .insert_try_to_send(
2078 &mut dbtx,
2079 Some(TxMetadata {
2080 deposit_outpoint: Some(deposit_outpoint),
2081 operator_xonly_pk: None,
2082 round_idx: None,
2083 kickoff_idx: None,
2084 tx_type: TransactionType::MoveToVault,
2085 }),
2086 &movetx,
2087 FeePayingType::CPFP,
2088 None,
2089 &[],
2090 &[],
2091 &[],
2092 &[],
2093 )
2094 .await
2095 .map_to_status()?;
2096 dbtx.commit()
2097 .await
2098 .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
2099
2100 Ok(Response::new(movetx.compute_txid().into()))
2101 }
2102 }
2103}
2104
2105#[cfg(test)]
2106mod tests {
2107 use crate::actor::Actor;
2108 use crate::builder;
2109 use crate::config::BridgeConfig;
2110 use crate::deposit::{BaseDepositData, DepositInfo, DepositType};
2111 use crate::musig2::AggregateFromPublicKeys;
2112 use crate::rpc::clementine::clementine_aggregator_client::ClementineAggregatorClient;
2113 use crate::rpc::clementine::{self, GetEntityStatusesRequest, SendMoveTxRequest};
2114 use crate::rpc::get_clients;
2115 use crate::servers::create_aggregator_unix_server;
2116 use crate::test::common::citrea::MockCitreaClient;
2117 use crate::test::common::tx_utils::ensure_tx_onchain;
2118 use crate::test::common::*;
2119 use bitcoin::hashes::Hash;
2120 use bitcoincore_rpc::RpcApi;
2121 use clementine_primitives::EVMAddress;
2122 use eyre::Context;
2123 use std::time::Duration;
2124 use tokio::time::sleep;
2125 use tonic::{Request, Status};
2126
2127 #[cfg(feature = "automation")]
2128 async fn perform_deposit(mut config: BridgeConfig) -> Result<(), Status> {
2129 let regtest = create_regtest_rpc(&mut config).await;
2130 let rpc = regtest.rpc();
2131
2132 let actors = create_actors::<MockCitreaClient>(&config).await;
2133 let _unused =
2134 run_single_deposit::<MockCitreaClient>(&mut config, rpc.clone(), None, &actors, None)
2135 .await?;
2136
2137 Ok(())
2138 }
2139
2140 #[tokio::test(flavor = "multi_thread")]
2141 async fn aggregator_double_deposit() {
2142 let mut config = create_test_config_with_thread_name().await;
2143 let regtest = create_regtest_rpc(&mut config).await;
2144 let rpc = regtest.rpc();
2145 let actors = create_actors::<MockCitreaClient>(&config).await;
2146 let mut aggregator = actors.get_aggregator();
2147
2148 let evm_address = EVMAddress([1u8; 20]);
2149 let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2150
2151 let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2152 .setup(tonic::Request::new(clementine::Empty {}))
2153 .await
2154 .unwrap()
2155 .into_inner()
2156 .try_into()
2157 .unwrap();
2158 sleep(Duration::from_secs(3)).await;
2159
2160 let nofn_xonly_pk =
2161 bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2162
2163 let deposit_address = builder::address::generate_deposit_address(
2164 nofn_xonly_pk,
2165 signer.address.as_unchecked(),
2166 evm_address,
2167 config.protocol_paramset().network,
2168 config.protocol_paramset().user_takes_after,
2169 )
2170 .unwrap()
2171 .0;
2172
2173 let deposit_outpoint = rpc
2174 .send_to_address(&deposit_address, config.protocol_paramset().bridge_amount)
2175 .await
2176 .unwrap();
2177 rpc.mine_blocks(18).await.unwrap();
2178
2179 let deposit_info = DepositInfo {
2180 deposit_outpoint,
2181 deposit_type: DepositType::BaseDeposit(BaseDepositData {
2182 evm_address,
2183 recovery_taproot_address: signer.address.as_unchecked().clone(),
2184 }),
2185 };
2186
2187 let movetx_one = aggregator
2189 .new_deposit(clementine::Deposit::from(deposit_info.clone()))
2190 .await
2191 .unwrap()
2192 .into_inner();
2193 let movetx_one_txid: bitcoin::Txid = aggregator
2194 .send_move_to_vault_tx(SendMoveTxRequest {
2195 deposit_outpoint: Some(deposit_outpoint.into()),
2196 raw_tx: Some(movetx_one),
2197 })
2198 .await
2199 .unwrap()
2200 .into_inner()
2201 .try_into()
2202 .unwrap();
2203
2204 let movetx_two = aggregator
2205 .new_deposit(clementine::Deposit::from(deposit_info))
2206 .await
2207 .unwrap()
2208 .into_inner();
2209 let _movetx_two_txid: bitcoin::Txid = aggregator
2210 .send_move_to_vault_tx(SendMoveTxRequest {
2211 deposit_outpoint: Some(deposit_outpoint.into()),
2212 raw_tx: Some(movetx_two),
2213 })
2214 .await
2215 .unwrap()
2216 .into_inner()
2217 .try_into()
2218 .unwrap();
2219 rpc.mine_blocks(1).await.unwrap();
2220 sleep(Duration::from_secs(3)).await;
2221
2222 poll_until_condition(
2223 async || {
2224 rpc.mine_blocks(1).await.unwrap();
2225 Ok(rpc
2226 .is_tx_on_chain(&movetx_one_txid)
2227 .await
2228 .unwrap_or_default())
2229 },
2230 None,
2231 None,
2232 )
2233 .await
2234 .wrap_err_with(|| eyre::eyre!("MoveTx did not land onchain"))
2235 .unwrap();
2236 }
2237
2238 #[tokio::test(flavor = "multi_thread")]
2239 async fn aggregator_deposit_movetx_lands_onchain() {
2240 let mut config = create_test_config_with_thread_name().await;
2241 let regtest = create_regtest_rpc(&mut config).await;
2242 let rpc = regtest.rpc();
2243 let actors = create_actors::<MockCitreaClient>(&config).await;
2244 let mut aggregator = actors.get_aggregator();
2245
2246 let evm_address = EVMAddress([1u8; 20]);
2247 let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2248
2249 let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2250 .setup(tonic::Request::new(clementine::Empty {}))
2251 .await
2252 .unwrap()
2253 .into_inner()
2254 .try_into()
2255 .unwrap();
2256 sleep(Duration::from_secs(3)).await;
2257
2258 let nofn_xonly_pk =
2259 bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2260
2261 let deposit_address = builder::address::generate_deposit_address(
2262 nofn_xonly_pk,
2263 signer.address.as_unchecked(),
2264 evm_address,
2265 config.protocol_paramset().network,
2266 config.protocol_paramset().user_takes_after,
2267 )
2268 .unwrap()
2269 .0;
2270
2271 let deposit_outpoint = rpc
2272 .send_to_address(&deposit_address, config.protocol_paramset().bridge_amount)
2273 .await
2274 .unwrap();
2275 rpc.mine_blocks(18).await.unwrap();
2276
2277 let deposit_info = DepositInfo {
2278 deposit_outpoint,
2279 deposit_type: DepositType::BaseDeposit(BaseDepositData {
2280 evm_address,
2281 recovery_taproot_address: signer.address.as_unchecked().clone(),
2282 }),
2283 };
2284
2285 let start_time = std::time::Instant::now();
2287 let raw_move_tx = aggregator
2288 .new_deposit(clementine::Deposit::from(deposit_info))
2289 .await
2290 .unwrap()
2291 .into_inner();
2292 let end_time = std::time::Instant::now();
2293 tracing::info!("New deposit time: {:?}", end_time - start_time);
2294
2295 let movetx_txid = aggregator
2296 .send_move_to_vault_tx(SendMoveTxRequest {
2297 deposit_outpoint: Some(deposit_outpoint.into()),
2298 raw_tx: Some(raw_move_tx),
2299 })
2300 .await
2301 .unwrap()
2302 .into_inner()
2303 .try_into()
2304 .unwrap();
2305
2306 rpc.mine_blocks(1).await.unwrap();
2307 sleep(Duration::from_secs(3)).await;
2308
2309 poll_until_condition(
2310 async || {
2311 rpc.mine_blocks(1).await.unwrap();
2312 Ok(rpc.is_tx_on_chain(&movetx_txid).await.unwrap_or_default())
2313 },
2314 None,
2315 None,
2316 )
2317 .await
2318 .wrap_err_with(|| eyre::eyre!("MoveTx did not land onchain"))
2319 .unwrap();
2320 }
2321
2322 #[tokio::test]
2323 async fn aggregator_two_deposit_movetx_and_emergency_stop() {
2324 let mut config = create_test_config_with_thread_name().await;
2325 let regtest = create_regtest_rpc(&mut config).await;
2326 let rpc = regtest.rpc();
2327 let actors = create_actors::<MockCitreaClient>(&config).await;
2328 let mut aggregator = actors.get_aggregator();
2329
2330 let evm_address = EVMAddress([1u8; 20]);
2331 let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2332
2333 let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2334 .setup(tonic::Request::new(clementine::Empty {}))
2335 .await
2336 .unwrap()
2337 .into_inner()
2338 .try_into()
2339 .unwrap();
2340 sleep(Duration::from_secs(3)).await;
2341
2342 let nofn_xonly_pk =
2343 bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2344
2345 let deposit_address_0 = builder::address::generate_deposit_address(
2346 nofn_xonly_pk,
2347 signer.address.as_unchecked(),
2348 evm_address,
2349 config.protocol_paramset().network,
2350 config.protocol_paramset().user_takes_after,
2351 )
2352 .unwrap()
2353 .0;
2354
2355 let deposit_address_1 = builder::address::generate_deposit_address(
2356 nofn_xonly_pk,
2357 signer.address.as_unchecked(),
2358 evm_address,
2359 config.protocol_paramset().network,
2360 config.protocol_paramset().user_takes_after,
2361 )
2362 .unwrap()
2363 .0;
2364
2365 let deposit_outpoint_0 = rpc
2366 .send_to_address(&deposit_address_0, config.protocol_paramset().bridge_amount)
2367 .await
2368 .unwrap();
2369 rpc.mine_blocks(18).await.unwrap();
2370
2371 let deposit_outpoint_1 = rpc
2372 .send_to_address(&deposit_address_1, config.protocol_paramset().bridge_amount)
2373 .await
2374 .unwrap();
2375 rpc.mine_blocks(18).await.unwrap();
2376
2377 let deposit_info_0 = DepositInfo {
2378 deposit_outpoint: deposit_outpoint_0,
2379 deposit_type: DepositType::BaseDeposit(BaseDepositData {
2380 evm_address,
2381 recovery_taproot_address: signer.address.as_unchecked().clone(),
2382 }),
2383 };
2384
2385 let deposit_info_1 = DepositInfo {
2386 deposit_outpoint: deposit_outpoint_1,
2387 deposit_type: DepositType::BaseDeposit(BaseDepositData {
2388 evm_address,
2389 recovery_taproot_address: signer.address.as_unchecked().clone(),
2390 }),
2391 };
2392
2393 let raw_move_tx_0 = aggregator
2395 .new_deposit(clementine::Deposit::from(deposit_info_0))
2396 .await
2397 .unwrap()
2398 .into_inner();
2399 let move_txid_0: bitcoin::Txid = aggregator
2400 .send_move_to_vault_tx(SendMoveTxRequest {
2401 deposit_outpoint: Some(deposit_outpoint_0.into()),
2402 raw_tx: Some(raw_move_tx_0),
2403 })
2404 .await
2405 .unwrap()
2406 .into_inner()
2407 .try_into()
2408 .unwrap();
2409
2410 rpc.mine_blocks(1).await.unwrap();
2411 sleep(Duration::from_secs(3)).await;
2412 ensure_tx_onchain(rpc, move_txid_0)
2413 .await
2414 .expect("failed to get movetx_0 on chain");
2415
2416 let raw_move_tx_1 = aggregator
2418 .new_deposit(clementine::Deposit::from(deposit_info_1))
2419 .await
2420 .unwrap()
2421 .into_inner();
2422 let move_txid_1 = aggregator
2423 .send_move_to_vault_tx(SendMoveTxRequest {
2424 deposit_outpoint: Some(deposit_outpoint_1.into()),
2425 raw_tx: Some(raw_move_tx_1),
2426 })
2427 .await
2428 .unwrap()
2429 .into_inner()
2430 .try_into()
2431 .unwrap();
2432
2433 rpc.mine_blocks(1).await.unwrap();
2434 ensure_tx_onchain(rpc, move_txid_1)
2435 .await
2436 .expect("failed to get movetx_1 on chain");
2437 sleep(Duration::from_secs(3)).await;
2438
2439 let move_txids = vec![move_txid_0, move_txid_1];
2440
2441 tracing::debug!("Move txids: {:?}", move_txids);
2442
2443 let emergency_txid = aggregator
2444 .internal_get_emergency_stop_tx(tonic::Request::new(
2445 clementine::GetEmergencyStopTxRequest {
2446 txids: move_txids
2447 .iter()
2448 .map(|txid| clementine::Txid {
2449 txid: txid.to_byte_array().to_vec(),
2450 })
2451 .collect(),
2452 },
2453 ))
2454 .await
2455 .unwrap()
2456 .into_inner();
2457
2458 let decryption_priv_key =
2459 hex::decode("a80bc8cf095c2b37d4c6233114e0dd91f43d75de5602466232dbfcc1fc66c542")
2460 .expect("Failed to parse emergency stop encryption public key");
2461 let emergency_stop_tx: bitcoin::Transaction = bitcoin::consensus::deserialize(
2462 &crate::encryption::decrypt_bytes(
2463 &decryption_priv_key,
2464 &emergency_txid.encrypted_emergency_stop_txs[0],
2465 )
2466 .expect("Failed to decrypt emergency stop tx"),
2467 )
2468 .expect("Failed to deserialize");
2469
2470 rpc.send_raw_transaction(&emergency_stop_tx)
2471 .await
2472 .expect("Failed to send emergency stop tx");
2473
2474 let emergency_stop_txid = emergency_stop_tx.compute_txid();
2475 rpc.mine_blocks(1).await.unwrap();
2476
2477 poll_until_condition(
2478 async || {
2479 rpc.mine_blocks(1).await.unwrap();
2480 Ok(rpc
2481 .is_tx_on_chain(&emergency_stop_txid)
2482 .await
2483 .unwrap_or_default())
2484 },
2485 None,
2486 None,
2487 )
2488 .await
2489 .wrap_err_with(|| eyre::eyre!("Emergency stop tx did not land onchain"))
2490 .unwrap();
2491 }
2492
2493 #[cfg(feature = "automation")]
2494 #[tokio::test]
2495 #[ignore = "This test does not work"]
2496 async fn aggregator_deposit_finalize_verifier_timeout() {
2497 let mut config = create_test_config_with_thread_name().await;
2498 config
2499 .test_params
2500 .timeout_params
2501 .deposit_finalize_verifier_idx = Some(0);
2502 let res = perform_deposit(config).await;
2503 assert!(res.is_err());
2504 let err_string = res.unwrap_err().to_string();
2505 assert!(
2506 err_string.contains("Deposit finalization from verifiers"),
2507 "Error string was: {err_string}"
2508 );
2509 }
2510
2511 #[cfg(feature = "automation")]
2512 #[tokio::test]
2513 async fn aggregator_deposit_key_distribution_verifier_timeout() {
2514 let mut config = create_test_config_with_thread_name().await;
2515 config
2516 .test_params
2517 .timeout_params
2518 .key_distribution_verifier_idx = Some(0);
2519
2520 let res = perform_deposit(config).await;
2521
2522 assert!(res.is_err());
2523 let err_string = res.unwrap_err().to_string();
2524 assert!(
2525 err_string.contains("Verifier key distribution (id:"),
2526 "Error string was: {err_string}"
2527 );
2528 }
2529
2530 #[cfg(feature = "automation")]
2531 #[tokio::test]
2532 async fn aggregator_deposit_key_distribution_operator_timeout() {
2533 let mut config = create_test_config_with_thread_name().await;
2534 config
2535 .test_params
2536 .timeout_params
2537 .key_collection_operator_idx = Some(0);
2538
2539 let res = perform_deposit(config).await;
2540
2541 assert!(res.is_err());
2542 let err_string = res.unwrap_err().to_string();
2543 assert!(
2544 err_string.contains("Operator key collection (id:"),
2545 "Error string was: {err_string}"
2546 );
2547 }
2548
2549 #[cfg(feature = "automation")]
2550 #[tokio::test]
2551 async fn aggregator_deposit_nonce_stream_creation_verifier_timeout() {
2552 let mut config = create_test_config_with_thread_name().await;
2553 config
2554 .test_params
2555 .timeout_params
2556 .nonce_stream_creation_verifier_idx = Some(0);
2557
2558 let res = perform_deposit(config).await;
2559
2560 assert!(res.is_err());
2561 let err_string = res.unwrap_err().to_string();
2562 assert!(
2563 err_string.contains("Nonce stream creation (id:"),
2564 "Error string was: {err_string}"
2565 );
2566 }
2567
2568 #[cfg(feature = "automation")]
2569 #[tokio::test]
2570 async fn aggregator_deposit_partial_sig_stream_creation_timeout() {
2571 let mut config = create_test_config_with_thread_name().await;
2572 config
2573 .test_params
2574 .timeout_params
2575 .partial_sig_stream_creation_verifier_idx = Some(0);
2576
2577 let res = perform_deposit(config).await;
2578
2579 assert!(res.is_err());
2580 let err_string = res.unwrap_err().to_string();
2581 assert!(
2582 err_string.contains("Partial signature stream creation (id:"),
2583 "Error string was: {err_string}"
2584 );
2585 }
2586
2587 #[cfg(feature = "automation")]
2588 #[tokio::test]
2589 async fn aggregator_deposit_operator_sig_collection_operator_timeout() {
2590 let mut config = create_test_config_with_thread_name().await;
2591 config
2592 .test_params
2593 .timeout_params
2594 .operator_sig_collection_operator_idx = Some(0);
2595
2596 let res = perform_deposit(config).await;
2597
2598 assert!(res.is_err());
2599 let err_string = res.unwrap_err().to_string();
2600 assert!(
2601 err_string.contains("Operator signature stream creation (id:"),
2602 "Error string was: {err_string}"
2603 );
2604 }
2605
2606 #[tokio::test]
2607 async fn aggregator_get_entity_statuses() {
2608 let mut config = create_test_config_with_thread_name().await;
2609 let _regtest = create_regtest_rpc(&mut config).await;
2610
2611 let actors = create_actors::<MockCitreaClient>(&config).await;
2612 let mut aggregator = actors.get_aggregator();
2613 let status = aggregator
2614 .get_entity_statuses(Request::new(GetEntityStatusesRequest {
2615 restart_tasks: false,
2616 }))
2617 .await
2618 .unwrap()
2619 .into_inner();
2620
2621 tracing::info!("Status: {:?}", status);
2622
2623 assert_eq!(
2624 status.entity_statuses.len(),
2625 config.test_params.all_operators_secret_keys.len()
2626 + config.test_params.all_verifiers_secret_keys.len()
2627 );
2628 }
2629
2630 #[tokio::test]
2631 async fn aggregator_start_with_offline_verifier() {
2632 let mut config = create_test_config_with_thread_name().await;
2633 let _regtest = create_regtest_rpc(&mut config).await;
2635 config.verifier_endpoints = Some(vec!["https://142.143.144.145:17001".to_string()]);
2637 config.operator_endpoints = Some(vec!["https://142.143.144.145:17002".to_string()]);
2638 let socket_dir = tempfile::tempdir().unwrap();
2640 let socket_path = socket_dir.path().join("aggregator.sock");
2641
2642 tracing::info!("Creating unix aggregator server");
2643
2644 let (_, _shutdown_tx) = create_aggregator_unix_server(config.clone(), socket_path.clone())
2645 .await
2646 .unwrap();
2647
2648 tracing::info!("Created unix aggregator server");
2649
2650 let mut aggregator_client = get_clients(
2651 vec![format!("unix://{}", socket_path.display())],
2652 ClementineAggregatorClient::new,
2653 &config,
2654 false,
2655 )
2656 .await
2657 .unwrap()
2658 .pop()
2659 .unwrap();
2660
2661 tracing::info!("Got aggregator client");
2662
2663 assert!(aggregator_client
2665 .vergen(Request::new(clementine::Empty {}))
2666 .await
2667 .is_ok());
2668
2669 tracing::info!("After vergen");
2670
2671 assert!(aggregator_client
2673 .setup(Request::new(clementine::Empty {}))
2674 .await
2675 .is_err());
2676
2677 tracing::info!("After setup");
2678
2679 tracing::info!(
2682 "Entity statuses: {:?}",
2683 aggregator_client
2684 .get_entity_statuses(Request::new(GetEntityStatusesRequest {
2685 restart_tasks: false,
2686 }))
2687 .await
2688 .unwrap()
2689 );
2690 }
2691}