1use super::clementine::{
2 clementine_aggregator_server::ClementineAggregator, verifier_deposit_finalize_params,
3 DepositParams, Empty, VerifierDepositFinalizeParams,
4};
5use super::clementine::{
6 AggregatorWithdrawResponse, Deposit, EntityStatuses, GetEntityStatusesRequest,
7 OptimisticPayoutParams, RawSignedTx, VergenResponse, VerifierPublicKeys,
8};
9use crate::aggregator::{
10 AggregatorServer, CompatibilityCheckScope, OperatorId, ParticipatingOperators,
11 ParticipatingVerifiers, VerifierId,
12};
13use crate::bitvm_client::SECP;
14use crate::builder::sighash::SignatureInfo;
15use crate::builder::transaction::{
16 create_emergency_stop_txhandler, create_move_to_vault_txhandler,
17 create_optimistic_payout_txhandler, Signed, TxHandler,
18};
19use crate::compatibility::ActorWithConfig;
20use crate::config::BridgeConfig;
21use crate::constants::{
22 DEPOSIT_FINALIZATION_TIMEOUT, DEPOSIT_FINALIZE_STREAM_CREATION_TIMEOUT,
23 KEY_DISTRIBUTION_TIMEOUT, NONCE_STREAM_CREATION_TIMEOUT, OPERATOR_SIGS_STREAM_CREATION_TIMEOUT,
24 OPERATOR_SIGS_TIMEOUT, OPTIMISTIC_PAYOUT_TIMEOUT, OVERALL_DEPOSIT_TIMEOUT,
25 PARTIAL_SIG_STREAM_CREATION_TIMEOUT, PIPELINE_COMPLETION_TIMEOUT, SEND_OPERATOR_SIGS_TIMEOUT,
26 SETUP_COMPLETION_TIMEOUT, WITHDRAWAL_TIMEOUT,
27};
28use crate::deposit::{Actors, DepositData, DepositInfo};
29use crate::musig2::AggregateFromPublicKeys;
30use crate::rpc::clementine::{
31 operator_withrawal_response, AggregatorWithdrawalInput, CompatibilityParamsRpc,
32 EntitiesCompatibilityData, OperatorWithrawalResponse, VerifierDepositSignParams,
33};
34use crate::rpc::parser;
35use crate::utils::{
36 flatten_join_named_results, get_vergen_response, timed_request, timed_try_join_all,
37 try_join_all_combine_errors, ScriptBufExt,
38};
39use crate::utils::{FeePayingType, TxMetadata};
40use crate::{
41 aggregator::Aggregator,
42 builder::sighash::create_nofn_sighash_stream,
43 musig2::aggregate_nonces,
44 rpc::clementine::{self, DepositSignSession},
45};
46use bitcoin::hashes::Hash;
47use bitcoin::secp256k1::schnorr::Signature;
48use bitcoin::secp256k1::{Message, PublicKey};
49use bitcoin::{TapSighash, TxOut, Txid, XOnlyPublicKey};
50use clementine_errors::BridgeError;
51use clementine_errors::TransactionType;
52use clementine_errors::{ErrorExt, ResultExt};
53use clementine_primitives::UTXO;
54use eyre::{Context, OptionExt};
55use futures::future::join_all;
56use futures::{
57 stream::{BoxStream, TryStreamExt},
58 FutureExt, Stream, StreamExt, TryFutureExt,
59};
60use secp256k1::musig::{AggregatedNonce, PartialSignature, PublicNonce};
61use std::future::Future;
62use tokio::sync::mpsc::{channel, Receiver, Sender};
63use tonic::{async_trait, Request, Response, Status, Streaming};
64struct AggNonceQueueItem {
65 agg_nonce: AggregatedNonce,
66 sighash: TapSighash,
67}
68
69struct FinalSigQueueItem {
70 final_sig: Vec<u8>,
71}
72
73use clementine_errors::AggregatorError;
74
75async fn get_next_pub_nonces(
76 nonce_streams: &mut [impl Stream<Item = Result<PublicNonce, BridgeError>>
77 + Unpin
78 + Send
79 + 'static],
80 verifiers_ids: &[VerifierId],
81) -> Result<Vec<PublicNonce>, BridgeError> {
82 try_join_all_combine_errors(nonce_streams.iter_mut().zip(verifiers_ids).map(
83 |(s, id)| async move {
84 s.next()
85 .await
86 .transpose()
87 .wrap_err(format!("Failed to get nonce from {id}"))? .ok_or_else(|| -> eyre::Report {
89 AggregatorError::InputStreamEndedEarlyUnknownSize {
90 stream_name: format!("Nonce stream {id}"),
92 }
93 .into()
94 })
95 },
96 ))
97 .await
98}
99
100async fn nonce_aggregator(
102 mut nonce_streams: Vec<
103 impl Stream<Item = Result<PublicNonce, BridgeError>> + Unpin + Send + 'static,
104 >,
105 mut sighash_stream: impl Stream<Item = Result<(TapSighash, SignatureInfo), BridgeError>>
106 + Unpin
107 + Send
108 + 'static,
109 agg_nonce_sender: Sender<(AggNonceQueueItem, Vec<PublicNonce>)>,
110 needed_nofn_sigs: usize,
111 verifiers_ids: Vec<VerifierId>,
112) -> Result<
113 (
114 (AggregatedNonce, Vec<PublicNonce>),
115 (AggregatedNonce, Vec<PublicNonce>),
116 ),
117 BridgeError,
118> {
119 let mut total_sigs = 0;
120
121 tracing::info!("Starting nonce aggregation (expecting {needed_nofn_sigs} nonces)");
122
123 if verifiers_ids.len() != nonce_streams.len() {
125 return Err(
126 eyre::eyre!("Number of verifiers ids and nonce streams must be the same").into(),
127 );
128 }
129
130 while let Some(msg) = sighash_stream.next().await {
131 let (sighash, siginfo) = msg.wrap_err("Sighash stream failed")?;
132
133 total_sigs += 1;
134
135 let pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
136 .await
137 .wrap_err_with(|| {
138 format!("Failed to get nonces from verifiers for sighash #{total_sigs} with siginfo: {siginfo:?}")
139 })?;
140
141 tracing::trace!(
142 "Received nonces for signature id {:?} in nonce_aggregator",
143 siginfo.signature_id
144 );
145
146 let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::<Vec<_>>().as_slice())?;
147
148 agg_nonce_sender
149 .send((AggNonceQueueItem { agg_nonce, sighash }, pub_nonces))
150 .await
151 .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
152 stream_name: "nonce_aggregator".to_string(),
153 })?;
154
155 tracing::trace!(
156 "Sent nonces for signature id {:?} in nonce_aggregator",
157 siginfo.signature_id
158 );
159 }
160 tracing::trace!(tmp_debug = 1, "Sent {total_sigs} to agg_nonce stream");
161
162 if total_sigs != needed_nofn_sigs {
164 let err_msg = format!(
165 "Expected {needed_nofn_sigs} nofn signatures, got {total_sigs} from sighash stream",
166 );
167 tracing::error!("{err_msg}");
168 return Err(eyre::eyre!(err_msg).into());
169 }
170 let movetx_pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
172 .await
173 .wrap_err("Failed to get movetx public nonces from verifiers")?;
174
175 tracing::trace!("Received nonces for movetx in nonce_aggregator");
176
177 let move_tx_agg_nonce =
178 aggregate_nonces(movetx_pub_nonces.iter().collect::<Vec<_>>().as_slice())
179 .wrap_err("Failed to aggregate movetx nonces")?;
180
181 let emergency_stop_pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
182 .await
183 .wrap_err("Failed to get emergency stop tx public nonces from verifiers")?;
184
185 let emergency_stop_agg_nonce = aggregate_nonces(
186 emergency_stop_pub_nonces
187 .iter()
188 .collect::<Vec<_>>()
189 .as_slice(),
190 )
191 .wrap_err("Failed to aggregate emergency stop tx nonces")?;
192
193 Ok((
194 (move_tx_agg_nonce, movetx_pub_nonces),
195 (emergency_stop_agg_nonce, emergency_stop_pub_nonces),
196 ))
197}
198
199async fn nonce_distributor(
201 mut agg_nonce_receiver: Receiver<(AggNonceQueueItem, Vec<PublicNonce>)>,
202 partial_sig_streams: Vec<(
203 Streaming<clementine::PartialSig>,
204 Sender<clementine::VerifierDepositSignParams>,
205 )>,
206 partial_sig_sender: Sender<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>,
207 needed_nofn_sigs: usize,
208 verifiers_ids: Vec<VerifierId>,
209) -> Result<(), BridgeError> {
210 let mut nonce_count = 0;
211 let mut sig_count = 0;
212 let (mut partial_sig_rx, mut partial_sig_tx): (Vec<_>, Vec<_>) =
213 partial_sig_streams.into_iter().unzip();
214
215 let (queue_tx, mut queue_rx) = channel(crate::constants::DEFAULT_CHANNEL_SIZE);
216
217 if verifiers_ids.len() != partial_sig_rx.len() {
219 return Err(eyre::eyre!(
220 "Number of verifiers ids and partial sig streams must be the same"
221 )
222 .into());
223 }
224 let verifiers_ids_clone = verifiers_ids.clone();
225 let handle_1 = tokio::spawn(async move {
226 while let Some((queue_item, pub_nonces)) = agg_nonce_receiver.recv().await {
227 nonce_count += 1;
228
229 tracing::trace!(
230 "Received aggregated nonce {} in nonce_distributor",
231 nonce_count
232 );
233
234 let agg_nonce_wrapped = clementine::VerifierDepositSignParams {
235 params: Some(clementine::verifier_deposit_sign_params::Params::AggNonce(
236 queue_item.agg_nonce.serialize().to_vec(),
237 )),
238 };
239
240 try_join_all_combine_errors(
242 partial_sig_tx
243 .iter_mut()
244 .zip(verifiers_ids_clone.iter())
245 .map(|(tx, id)| {
246 let agg_nonce_wrapped = agg_nonce_wrapped.clone();
247 async move {
248 tx.send(agg_nonce_wrapped)
249 .await
250 .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
251 stream_name: format!("Partial sig {id}"),
252 })
253 .inspect_err(|e| {
254 tracing::error!(
255 "Failed to send aggregated nonce to {id}: {:?}",
256 e
257 );
258 })
259 }
260 }),
261 )
262 .await
263 .wrap_err("Failed to send aggregated nonces to verifiers")?;
264
265 queue_tx
266 .send((queue_item, pub_nonces))
267 .await
268 .wrap_err("Other end of channel closed")?;
269
270 tracing::trace!(
271 "Sent aggregated nonce {} to verifiers in nonce_distributor",
272 nonce_count
273 );
274 if nonce_count == needed_nofn_sigs {
275 break;
276 }
277 }
278 if nonce_count != needed_nofn_sigs {
279 let err_msg = format!("Expected {needed_nofn_sigs} aggregated nonces in nonce_distributor, got {nonce_count}",);
280 tracing::error!("{err_msg}");
281 return Err(eyre::eyre!(err_msg).into());
282 }
283
284 tracing::trace!(
285 tmp_debug = 1,
286 "Broadcasted {nonce_count} agg_nonces to verifiers and to the queue"
287 );
288 Ok::<(), BridgeError>(())
289 });
290
291 let handle_2 = tokio::spawn(async move {
292 while let Some((queue_item, pub_nonces)) = queue_rx.recv().await {
293 let pub_nonces_ref = pub_nonces.as_slice();
294 if pub_nonces_ref.len() != partial_sig_rx.len() {
295 return Err(eyre::eyre!(
296 "Number of public nonces {} and partial sig streams {} must be the same",
297 pub_nonces_ref.len(),
298 partial_sig_rx.len()
299 )
300 .into());
301 }
302 let partial_sigs = try_join_all_combine_errors(partial_sig_rx.iter_mut().zip(pub_nonces_ref.iter()).zip(verifiers_ids.iter()).map(
303 |((stream, pub_nonce), id)| async move {
304 let partial_sig = stream
305 .message()
306 .await
307 .wrap_err_with(|| AggregatorError::RequestFailed {
308 request_name: format!("Partial sig {sig_count} from {id}"),
309 })
310 .inspect_err(|e| {
311 tracing::error!(
312 "Failed to receive partial signature {sig_count} from {id}, an error was sent: {:?}",
313 e
314 );
315 })?
316 .ok_or_eyre(AggregatorError::InputStreamEndedEarlyUnknownSize {
317 stream_name: format!("Partial sig {sig_count} from {id} closed"),
318 }).inspect_err(|e| {
319 tracing::error!(
320 "Failed to receive partial signature {sig_count} from {id}, the stream was closed: {:?}",
321 e
322 );
323 })?;
324 let partial_sig = PartialSignature::from_byte_array(
325 &partial_sig
326 .partial_sig
327 .as_slice()
328 .try_into()
329 .wrap_err("PartialSignature must be 32 bytes")?,
330 )
331 .wrap_err(format!("Failed to parse partial signature {sig_count} from {id}"))?;
332
333 Ok::<_, BridgeError>((partial_sig, *pub_nonce))
334 },
335 ))
336 .await?;
337
338 sig_count += 1;
339
340 tracing::trace!(
341 "Received partial signature {} from verifiers in nonce_distributor",
342 sig_count
343 );
344
345 partial_sig_sender
346 .send((partial_sigs, queue_item))
347 .await
348 .map_err(|_| {
349 eyre::eyre!(AggregatorError::OutputStreamEndedEarly {
350 stream_name: "partial_sig_sender".into(),
351 })
352 })?;
353
354 tracing::trace!(
355 "Sent partial signature {} to signature_aggregator in nonce_distributor",
356 sig_count
357 );
358 }
359
360 if sig_count != needed_nofn_sigs {
361 let err_msg = format!(
362 "Expected {needed_nofn_sigs} partial signatures in nonce_distributor, got {sig_count}",
363 );
364 tracing::error!("{err_msg}");
365 return Err(eyre::eyre!(err_msg).into());
366 }
367 tracing::trace!(
368 tmp_debug = 1,
369 "Sent {sig_count} partial sig bundles to partial_sigs stream"
370 );
371
372 tracing::trace!("Finished tasks in nonce_distributor handle 2");
373 Ok::<(), BridgeError>(())
374 });
375
376 let (result_1, result_2) = tokio::join!(handle_1, handle_2);
377
378 let mut task_errors = Vec::new();
379
380 match result_1 {
381 Ok(inner_result) => {
382 if let Err(e) = inner_result {
383 task_errors.push(format!(
384 "Task returned error while distributing aggnonces: {e:#?}"
385 ));
386 }
387 }
388 Err(e) => {
389 task_errors.push(format!(
390 "Task panicked while distributing aggnonces: {e:#?}"
391 ));
392 }
393 }
394
395 match result_2 {
396 Ok(inner_result) => {
397 if let Err(e) = inner_result {
398 task_errors.push(format!(
399 "Task returned error while receiving partial sigs: {e:#?}"
400 ));
401 }
402 }
403 Err(e) => {
404 task_errors.push(format!(
405 "Task panicked while receiving partial sigs: {e:#?}"
406 ));
407 }
408 }
409
410 if !task_errors.is_empty() {
411 return Err(eyre::eyre!(format!(
412 "nonce_distributor failed with errors: {:#?}",
413 task_errors
414 ))
415 .into());
416 }
417
418 tracing::debug!("Finished tasks in nonce_distributor");
419
420 Ok(())
421}
422
423async fn signature_aggregator(
426 mut partial_sig_receiver: Receiver<(Vec<(PartialSignature, PublicNonce)>, AggNonceQueueItem)>,
427 verifiers_public_keys: Vec<PublicKey>,
428 final_sig_sender: Sender<FinalSigQueueItem>,
429 needed_nofn_sigs: usize,
430) -> Result<(), BridgeError> {
431 let mut sig_count = 0;
432 while let Some((partial_sigs, queue_item)) = partial_sig_receiver.recv().await {
433 sig_count += 1;
434 tracing::trace!(
435 "Received partial signatures {} in signature_aggregator",
436 sig_count
437 );
438
439 let final_sig = crate::musig2::aggregate_partial_signatures(
440 verifiers_public_keys.clone(),
441 None,
442 queue_item.agg_nonce,
443 &partial_sigs,
444 Message::from_digest(queue_item.sighash.to_byte_array()),
445 )?;
446
447 final_sig_sender
448 .send(FinalSigQueueItem {
449 final_sig: final_sig.serialize().to_vec(),
450 })
451 .await
452 .wrap_err_with(|| {
453 eyre::eyre!(AggregatorError::OutputStreamEndedEarly {
454 stream_name: "final_sig_sender".into(),
455 })
456 })?;
457 tracing::trace!(
458 "Sent aggregated signature {} to signature_distributor in signature_aggregator",
459 sig_count
460 );
461
462 if sig_count == needed_nofn_sigs {
463 break;
464 }
465 }
466
467 if sig_count != needed_nofn_sigs {
468 let err_msg = format!(
469 "Expected {needed_nofn_sigs} aggregated signatures in signature_aggregator, got {sig_count}",
470 );
471 tracing::error!("{err_msg}");
472 return Err(eyre::eyre!(err_msg).into());
473 }
474
475 tracing::trace!(
476 tmp_debug = 1,
477 "Sent {sig_count} aggregated signatures to final_sig stream"
478 );
479
480 Ok(())
481}
482
483async fn signature_distributor(
486 mut final_sig_receiver: Receiver<FinalSigQueueItem>,
487 deposit_finalize_sender: Vec<Sender<VerifierDepositFinalizeParams>>,
488 agg_nonce: impl Future<
489 Output = Result<
490 (
491 (AggregatedNonce, Vec<PublicNonce>),
492 (AggregatedNonce, Vec<PublicNonce>),
493 ),
494 Status,
495 >,
496 >,
497 needed_nofn_sigs: usize,
498 verifiers_ids: Vec<VerifierId>,
499) -> Result<(), BridgeError> {
500 use verifier_deposit_finalize_params::Params;
501 let mut sig_count = 0;
502 while let Some(queue_item) = final_sig_receiver.recv().await {
503 sig_count += 1;
504 tracing::trace!("Received signature {} in signature_distributor", sig_count);
505 let final_params = VerifierDepositFinalizeParams {
506 params: Some(Params::SchnorrSig(queue_item.final_sig)),
507 };
508
509 try_join_all_combine_errors(
510 deposit_finalize_sender
511 .iter()
512 .zip(verifiers_ids.iter())
513 .map(|(tx, id)| {
514 let final_params = final_params.clone();
515 async move {
516 tx.send(final_params).await.wrap_err_with(|| {
517 AggregatorError::OutputStreamEndedEarly {
518 stream_name: format!("Deposit finalize sender for {id}"),
519 }
520 })
521 }
522 }),
523 )
524 .await
525 .wrap_err(format!(
526 "Failed to send final signature {sig_count} to verifiers"
527 ))?;
528
529 tracing::trace!(
530 "Sent signature {} to verifiers in signature_distributor",
531 sig_count
532 );
533
534 if sig_count == needed_nofn_sigs {
535 break;
536 }
537 }
538
539 if sig_count != needed_nofn_sigs {
540 let err_msg = format!(
541 "Expected {needed_nofn_sigs} signatures in signature_distributor, got {sig_count}",
542 );
543 tracing::error!("{err_msg}");
544 return Err(eyre::eyre!(err_msg).into());
545 }
546
547 tracing::trace!(
548 tmp_debug = 1,
549 "Sent {sig_count} signatures to verifiers in deposit_finalize"
550 );
551
552 let (movetx_agg_nonce, emergency_stop_agg_nonce) = agg_nonce
553 .await
554 .wrap_err("Failed to get aggregated nonce for movetx and emergency stop")?;
555
556 tracing::info!("Got aggregated nonce for movetx and emergency stop in signature distributor");
557
558 for tx in &deposit_finalize_sender {
560 tx.send(VerifierDepositFinalizeParams {
561 params: Some(Params::MoveTxAggNonce(
562 movetx_agg_nonce.0.serialize().to_vec(),
563 )),
564 })
565 .await
566 .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
567 stream_name: "Deposit finalize sender (for movetx agg nonce)".to_string(),
568 })?;
569 }
570 tracing::info!("Sent movetx aggregated nonce to verifiers in signature distributor");
571
572 for tx in &deposit_finalize_sender {
574 tx.send(VerifierDepositFinalizeParams {
575 params: Some(Params::EmergencyStopAggNonce(
576 emergency_stop_agg_nonce.0.serialize().to_vec(),
577 )),
578 })
579 .await
580 .wrap_err_with(|| AggregatorError::OutputStreamEndedEarly {
581 stream_name: "Deposit finalize sender (for emergency stop agg nonce)".to_string(),
582 })?;
583 }
584 tracing::info!("Sent emergency stop aggregated nonce to verifiers in signature distributor");
585
586 Ok(())
587}
588
589async fn create_nonce_streams(
597 verifiers: ParticipatingVerifiers,
598 num_nonces: u32,
599 #[cfg(test)] config: &crate::config::BridgeConfig,
600) -> Result<
601 (
602 Vec<clementine::NonceGenFirstResponse>,
603 Vec<BoxStream<'static, Result<PublicNonce, BridgeError>>>,
604 ),
605 BridgeError,
606> {
607 let mut nonce_streams = timed_try_join_all(
608 NONCE_STREAM_CREATION_TIMEOUT,
609 "Nonce stream creation",
610 Some(verifiers.ids()),
611 verifiers
612 .clients()
613 .into_iter()
614 .enumerate()
615 .map(|(idx, client)| {
616 let mut client = client.clone();
617 #[cfg(test)]
618 let config = config.clone();
619
620 async move {
621 #[cfg(test)]
622 config
623 .test_params
624 .timeout_params
625 .hook_timeout_nonce_stream_creation_verifier(idx)
626 .await;
627 let response_stream = client
628 .nonce_gen(tonic::Request::new(clementine::NonceGenRequest {
629 num_nonces,
630 }))
631 .await
632 .wrap_err_with(|| AggregatorError::RequestFailed {
633 request_name: format!("Nonce gen stream for verifier {idx}"),
634 })?;
635
636 Ok::<_, BridgeError>(response_stream.into_inner())
637 }
638 }),
639 )
640 .await?;
641
642 let first_responses: Vec<clementine::NonceGenFirstResponse> =
644 try_join_all_combine_errors(nonce_streams.iter_mut().zip(verifiers.ids()).map(
645 |(stream, id)| async move {
646 parser::verifier::parse_nonce_gen_first_response(stream)
647 .await
648 .wrap_err_with(|| format!("Failed to get initial response from {id}"))
649 },
650 ))
651 .await
652 .wrap_err("Failed to get nonce gen's initial responses from verifiers")?;
653
654 let transformed_streams = nonce_streams
655 .into_iter()
656 .zip(verifiers.ids())
657 .map(|(stream, id)| {
658 stream
659 .map(move |result| {
660 Aggregator::extract_pub_nonce(
661 result
662 .wrap_err_with(|| AggregatorError::InputStreamEndedEarlyUnknownSize {
663 stream_name: format!("Nonce gen stream for {id}"),
664 })?
665 .response,
666 )
667 })
668 .boxed()
669 })
670 .collect::<Vec<_>>();
671
672 Ok((first_responses, transformed_streams))
673}
674
675async fn collect_and_call<R, T, F, Fut>(
680 rx: &mut tokio::sync::broadcast::Receiver<Vec<T>>,
681 mut f: F,
682) -> Result<R, Status>
683where
684 R: Default,
685 T: Clone,
686 F: FnMut(Vec<T>) -> Fut,
687 Fut: Future<Output = Result<R, Status>>,
688{
689 loop {
690 match rx.recv().await {
691 Ok(params) => {
692 f(params).await?;
693 }
694 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
695 break Err(Status::internal(format!(
696 "lost {n} items due to lagging receiver"
697 )));
698 }
699 Err(tokio::sync::broadcast::error::RecvError::Closed) => break Ok(R::default()),
700 }
701 }
702}
703
704impl Aggregator {
705 fn extract_pub_nonce(
707 response: Option<clementine::nonce_gen_response::Response>,
708 ) -> Result<PublicNonce, BridgeError> {
709 match response.ok_or_eyre("NonceGen response is empty")? {
710 clementine::nonce_gen_response::Response::PubNonce(pub_nonce) => {
711 Ok(PublicNonce::from_byte_array(
712 &pub_nonce
713 .as_slice()
714 .try_into()
715 .wrap_err("PubNonce must be 66 bytes")?,
716 )
717 .wrap_err("Failed to parse pub nonce")?)
718 }
719 _ => Err(eyre::eyre!("Expected PubNonce in response").into()),
720 }
721 }
722
723 async fn collect_operator_sigs(
725 operator_clients: ParticipatingOperators,
726 config: BridgeConfig,
727 mut deposit_sign_session: DepositSignSession,
728 ) -> Result<Vec<Vec<Signature>>, BridgeError> {
729 deposit_sign_session.nonce_gen_first_responses = Vec::new(); let mut operator_sigs_streams =
731 timed_try_join_all(
733 OPERATOR_SIGS_STREAM_CREATION_TIMEOUT,
734 "Operator signature stream creation",
735 Some(operator_clients.ids()),
736 operator_clients.clients().into_iter().enumerate().map(|(idx, mut operator_client)| {
737 let sign_session = deposit_sign_session.clone();
738 #[cfg(test)]
739 let config = config.clone();
740 async move {
741 #[cfg(test)]
742 config
743 .test_params
744 .timeout_params
745 .hook_timeout_operator_sig_collection_operator(idx)
746 .await;
747 let stream = operator_client
748 .deposit_sign(tonic::Request::new(sign_session))
749 .await.wrap_err_with(|| AggregatorError::RequestFailed {
750 request_name: format!("Deposit sign stream for operator {idx}"),
751 })?;
752 Ok::<_, BridgeError>(stream.into_inner())
753 }
754 }))
755 .await?;
756
757 let deposit_data: DepositData = deposit_sign_session
758 .deposit_params
759 .clone()
760 .ok_or_else(|| eyre::eyre!("No deposit params found in deposit sign session"))?
761 .try_into()
762 .wrap_err("Failed to convert deposit params to deposit data")?;
763
764 let needed_sigs = config.get_num_required_operator_sigs(&deposit_data);
766
767 let operator_sigs =
769 try_join_all_combine_errors(operator_sigs_streams.iter_mut().enumerate().map(
770 |(idx, stream)| async move {
771 let mut sigs: Vec<Signature> = Vec::with_capacity(needed_sigs);
772 while let Some(sig) =
773 stream
774 .message()
775 .await
776 .wrap_err_with(|| AggregatorError::RequestFailed {
777 request_name: format!("Deposit sign stream for operator {idx}"),
778 })?
779 {
780 sigs.push(Signature::from_slice(&sig.schnorr_sig).wrap_err_with(|| {
781 format!("Failed to parse Schnorr signature from operator {idx}")
782 })?);
783 if sigs.len() == needed_sigs {
784 break;
785 }
786 }
787 Ok::<_, BridgeError>(sigs)
788 },
789 ))
790 .await
791 .wrap_err("Failed to get operator signatures from operators")?;
792
793 for (idx, sigs) in operator_sigs.iter().enumerate() {
795 if sigs.len() != needed_sigs {
796 return Err(eyre::eyre!(
797 "Not all operator sigs received from operator {}.\n Expected: {}, got: {}",
798 idx,
799 needed_sigs,
800 sigs.len()
801 )
802 .into());
803 }
804 }
805 Ok(operator_sigs)
806 }
807
808 async fn create_movetx(
809 &self,
810 partial_sigs: Vec<Vec<u8>>,
811 movetx_agg_and_pub_nonces: (AggregatedNonce, Vec<PublicNonce>),
812 deposit_params: DepositParams,
813 ) -> Result<TxHandler<Signed>, Status> {
814 let mut deposit_data: DepositData = deposit_params.try_into()?;
815 let musig_partial_sigs = parser::verifier::parse_partial_sigs(partial_sigs)?;
816
817 let mut move_txhandler =
819 create_move_to_vault_txhandler(&mut deposit_data, self.config.protocol_paramset())?;
820
821 let sighash = move_txhandler.calculate_script_spend_sighash_indexed(
822 0,
823 0,
824 bitcoin::TapSighashType::Default,
825 )?;
826
827 let musig_sigs_and_nonces = musig_partial_sigs
828 .into_iter()
829 .zip(movetx_agg_and_pub_nonces.1)
830 .collect::<Vec<_>>();
831
832 let verifiers_public_keys = deposit_data.get_verifiers();
834 let final_sig = crate::musig2::aggregate_partial_signatures(
835 verifiers_public_keys,
836 None,
837 movetx_agg_and_pub_nonces.0,
838 &musig_sigs_and_nonces,
839 Message::from_digest(sighash.to_byte_array()),
840 )?;
841
842 move_txhandler.set_p2tr_script_spend_witness(&[final_sig.as_ref()], 0, 0)?;
844
845 Ok(move_txhandler.promote()?)
846 }
847
848 async fn verify_and_save_emergency_stop_sigs(
849 &self,
850 emergency_stop_sigs: Vec<Vec<u8>>,
851 emergency_stop_agg_and_pub_nonces: (AggregatedNonce, Vec<PublicNonce>),
852 deposit_params: DepositParams,
853 ) -> Result<(), BridgeError> {
854 let mut deposit_data: DepositData = deposit_params
855 .try_into()
856 .wrap_err("Failed to convert deposit params to deposit data")?;
857 let musig_partial_sigs = parser::verifier::parse_partial_sigs(emergency_stop_sigs)
858 .wrap_err("Failed to parse emergency stop signatures")?;
859
860 let move_txhandler =
862 create_move_to_vault_txhandler(&mut deposit_data, self.config.protocol_paramset())?;
863
864 let mut emergency_stop_txhandler = create_emergency_stop_txhandler(
865 &mut deposit_data,
866 &move_txhandler,
867 self.config.protocol_paramset(),
868 )?;
869
870 let sighash = emergency_stop_txhandler.calculate_script_spend_sighash_indexed(
871 0,
872 0,
873 bitcoin::TapSighashType::SinglePlusAnyoneCanPay,
874 )?;
875
876 let verifiers_public_keys = deposit_data.get_verifiers();
877
878 let musig_sigs_and_nonces = musig_partial_sigs
879 .into_iter()
880 .zip(emergency_stop_agg_and_pub_nonces.1)
881 .collect::<Vec<_>>();
882
883 let final_sig = crate::musig2::aggregate_partial_signatures(
884 verifiers_public_keys,
885 None,
886 emergency_stop_agg_and_pub_nonces.0,
887 &musig_sigs_and_nonces,
888 Message::from_digest(sighash.to_byte_array()),
889 )
890 .wrap_err("Failed to aggregate emergency stop signatures")?;
891
892 let final_sig = bitcoin::taproot::Signature {
893 signature: final_sig,
894 sighash_type: bitcoin::TapSighashType::SinglePlusAnyoneCanPay,
895 };
896
897 emergency_stop_txhandler.set_p2tr_script_spend_witness(&[final_sig.serialize()], 0, 0)?;
899
900 let emergency_stop_tx = emergency_stop_txhandler.get_cached_tx();
901 let move_to_vault_txid = move_txhandler.get_txid();
902
903 tracing::debug!("Move to vault tx id: {}", move_to_vault_txid.to_string());
904
905 let emergency_stop_pubkey = self
906 .config
907 .emergency_stop_encryption_public_key
908 .ok_or_else(|| eyre::eyre!("Emergency stop encryption public key is not set"))?;
909 let encrypted_emergency_stop_tx = crate::encryption::encrypt_bytes(
910 emergency_stop_pubkey,
911 &bitcoin::consensus::serialize(&emergency_stop_tx),
912 )?;
913
914 self.db
915 .insert_signed_emergency_stop_tx_if_not_exists(
916 None,
917 move_to_vault_txid,
918 &encrypted_emergency_stop_tx,
919 )
920 .await?;
921
922 Ok(())
923 }
924
925 #[cfg(feature = "automation")]
926 pub async fn send_emergency_stop_tx(
927 &self,
928 tx: bitcoin::Transaction,
929 ) -> Result<bitcoin::Transaction, Status> {
930 let mut dbtx = self.db.begin_transaction().await?;
932 self.tx_sender
933 .insert_try_to_send(
934 Some(&mut dbtx),
935 Some(TxMetadata {
936 deposit_outpoint: None,
937 operator_xonly_pk: None,
938 round_idx: None,
939 kickoff_idx: None,
940 tx_type: TransactionType::EmergencyStop,
941 }),
942 &tx,
943 FeePayingType::RBF,
944 None,
945 &[],
946 &[],
947 &[],
948 &[],
949 )
950 .await?;
951 dbtx.commit()
952 .await
953 .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
954
955 Ok(tx)
956 }
957}
958
959#[async_trait]
960impl ClementineAggregator for AggregatorServer {
961 async fn get_compatibility_params(
962 &self,
963 _request: Request<Empty>,
964 ) -> Result<Response<CompatibilityParamsRpc>, Status> {
965 let params = self.aggregator.get_compatibility_params()?;
966 Ok(Response::new(params.try_into().map_to_status()?))
967 }
968
969 async fn get_compatibility_data_from_entities(
970 &self,
971 _request: Request<Empty>,
972 ) -> Result<Response<EntitiesCompatibilityData>, Status> {
973 let data = self
974 .aggregator
975 .get_compatibility_data_from_entities()
976 .await?;
977 Ok(Response::new(EntitiesCompatibilityData {
978 entities_compatibility_data: data,
979 }))
980 }
981
982 async fn vergen(&self, _request: Request<Empty>) -> Result<Response<VergenResponse>, Status> {
983 tracing::info!("Vergen rpc called");
984 Ok(Response::new(get_vergen_response()))
985 }
986
987 async fn get_entity_statuses(
988 &self,
989 request: Request<GetEntityStatusesRequest>,
990 ) -> Result<Response<EntityStatuses>, Status> {
991 tracing::info!("Get entity statuses rpc called");
992 let request = request.into_inner();
993 let restart_tasks = request.restart_tasks;
994
995 Ok(Response::new(EntityStatuses {
996 entity_statuses: self.aggregator.get_entity_statuses(restart_tasks).await?,
997 }))
998 }
999
1000 async fn optimistic_payout(
1001 &self,
1002 request: tonic::Request<super::OptimisticWithdrawParams>,
1003 ) -> std::result::Result<tonic::Response<super::RawSignedTx>, tonic::Status> {
1004 tracing::info!("Optimistic payout rpc called");
1005 let opt_withdraw_params = request.into_inner();
1006
1007 let withdraw_params =
1008 opt_withdraw_params
1009 .withdrawal
1010 .clone()
1011 .ok_or(Status::invalid_argument(
1012 "Withdrawal params not found for optimistic payout",
1013 ))?;
1014 let (deposit_id, input_signature, input_outpoint, output_script_pubkey, output_amount) =
1015 parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
1016 tracing::info!("Parsed optimistic payout rpc params, deposit id: {:?}, input signature: {:?}, input outpoint: {:?}, output script pubkey: {:?}, output amount: {:?}, verification signature: {:?}", deposit_id, input_signature, input_outpoint, output_script_pubkey, output_amount, opt_withdraw_params.verification_signature);
1017
1018 self.check_compatibility_with_actors(CompatibilityCheckScope::VerifiersOnly)
1020 .await?;
1021
1022 if self
1024 .rpc
1025 .is_utxo_spent(&input_outpoint)
1026 .await
1027 .map_to_status()?
1028 {
1029 return Err(Status::invalid_argument(format!(
1030 "Withdrawal utxo is already spent: {input_outpoint:?}",
1031 )));
1032 }
1033
1034 if !(output_script_pubkey.is_p2tr()
1036 || output_script_pubkey.is_p2pkh()
1037 || output_script_pubkey.is_p2sh()
1038 || output_script_pubkey.is_p2wpkh()
1039 || output_script_pubkey.is_p2wsh())
1040 {
1041 return Err(Status::invalid_argument(format!(
1042 "Output script pubkey is not a valid script pubkey: {output_script_pubkey}, must be p2tr, p2pkh, p2sh, p2wpkh, or p2wsh"
1043 )));
1044 }
1045
1046 let withdrawal = self
1048 .db
1049 .get_move_to_vault_txid_from_citrea_deposit(None, deposit_id)
1050 .await?;
1051 if let Some(move_txid) = withdrawal {
1052 let withdrawal_utxo = self
1054 .db
1055 .get_withdrawal_utxo_from_citrea_withdrawal(None, deposit_id)
1056 .await?;
1057 if withdrawal_utxo != input_outpoint {
1058 return Err(Status::invalid_argument(format!(
1059 "Withdrawal utxo is not correct: {withdrawal_utxo:?} != {input_outpoint:?}",
1060 )));
1061 }
1062
1063 let withdrawal_prevout = self
1065 .rpc
1066 .get_txout_from_outpoint(&input_outpoint)
1067 .await
1068 .map_to_status()?;
1069
1070 let user_xonly_pk = withdrawal_prevout
1071 .script_pubkey
1072 .try_get_taproot_pk()
1073 .map_err(|_| {
1074 Status::invalid_argument(format!(
1075 "Withdrawal prevout script_pubkey is not a Taproot output: {:?}",
1076 withdrawal_prevout.script_pubkey
1077 ))
1078 })?;
1079
1080 let withdrawal_utxo = UTXO {
1081 outpoint: input_outpoint,
1082 txout: withdrawal_prevout,
1083 };
1084
1085 let output_txout = TxOut {
1086 value: output_amount,
1087 script_pubkey: output_script_pubkey,
1088 };
1089
1090 let deposit_data = self
1091 .db
1092 .get_deposit_data_with_move_tx(None, move_txid)
1093 .await?;
1094
1095 let mut deposit_data = deposit_data
1096 .ok_or(eyre::eyre!(
1097 "Deposit data not found for move txid {}",
1098 move_txid
1099 ))
1100 .map_err(BridgeError::from)?;
1101
1102 let mut opt_payout_txhandler = create_optimistic_payout_txhandler(
1103 &mut deposit_data,
1104 withdrawal_utxo,
1105 output_txout,
1106 input_signature,
1107 self.config.protocol_paramset(),
1108 )?;
1109
1110 let sighash = opt_payout_txhandler
1111 .calculate_pubkey_spend_sighash(0, input_signature.sighash_type)?;
1112
1113 let message = Message::from_digest(sighash.to_byte_array());
1114
1115 SECP.verify_schnorr(&input_signature.signature, &message, &user_xonly_pk)
1116 .map_err(|_| Status::internal("Invalid signature for optimistic payout tx. Ensure the signature uses SinglePlusAnyoneCanPay sighash type."))?;
1117
1118 let participating_verifiers = self.get_participating_verifiers(&deposit_data).await?;
1120 let verifiers_ids = participating_verifiers.ids();
1121 let (first_responses, mut nonce_streams) = {
1122 create_nonce_streams(
1123 participating_verifiers.clone(),
1124 1,
1125 #[cfg(test)]
1126 &self.config,
1127 )
1128 .await?
1129 };
1130 let pub_nonces = get_next_pub_nonces(&mut nonce_streams, &verifiers_ids)
1132 .await
1133 .wrap_err("Failed to aggregate nonces for optimistic payout")
1134 .map_to_status()?;
1135 let agg_nonce = aggregate_nonces(pub_nonces.iter().collect::<Vec<_>>().as_slice())?;
1136
1137 let agg_nonce_bytes = agg_nonce.serialize().to_vec();
1138 let opt_payout_sign_futures = participating_verifiers
1140 .clients()
1141 .iter()
1142 .zip(first_responses)
1143 .map(|(client, first_response)| {
1144 let mut client = client.clone();
1145 let opt_withdraw_params = opt_withdraw_params.clone();
1146 {
1147 let agg_nonce_serialized = agg_nonce_bytes.clone();
1148 async move {
1149 let mut request = Request::new(OptimisticPayoutParams {
1150 opt_withdrawal: Some(opt_withdraw_params),
1151 agg_nonce: agg_nonce_serialized,
1152 nonce_gen: Some(first_response),
1153 });
1154 request.set_timeout(OPTIMISTIC_PAYOUT_TIMEOUT);
1155 client.optimistic_payout_sign(request).await
1156 }
1157 }
1158 })
1159 .collect::<Vec<_>>();
1160
1161 let opt_payout_resps = join_all(opt_payout_sign_futures).await;
1163 let mut payout_sigs = Vec::new();
1164 let mut errors = Vec::new();
1165 for (resp, verifier_id) in opt_payout_resps
1166 .into_iter()
1167 .zip(participating_verifiers.ids())
1168 {
1169 match resp {
1170 Ok(res) => {
1171 payout_sigs.push(res.into_inner());
1172 }
1173 Err(e) => {
1174 errors.push(format!("{verifier_id} optimistic payout sign failed: {e}"));
1175 }
1176 }
1177 }
1178 if !errors.is_empty() {
1179 return Err(eyre::eyre!("{errors:?}").into_status());
1180 }
1181
1182 let sighash = opt_payout_txhandler.calculate_script_spend_sighash_indexed(
1185 1,
1186 0,
1187 bitcoin::TapSighashType::Default,
1188 )?;
1189
1190 let musig_partial_sigs = payout_sigs
1191 .into_iter()
1192 .map(|sig| {
1193 PartialSignature::from_byte_array(
1194 &sig.partial_sig
1195 .try_into()
1196 .map_err(|_| secp256k1::musig::ParseError::MalformedArg)?,
1197 )
1198 })
1199 .collect::<Result<Vec<_>, _>>()
1200 .map_err(|e| Status::internal(format!("Failed to parse partial sig: {e:?}")))?;
1201
1202 let musig_sigs_and_nonces = musig_partial_sigs
1203 .into_iter()
1204 .zip(pub_nonces)
1205 .collect::<Vec<_>>();
1206
1207 let final_sig = bitcoin::taproot::Signature {
1208 signature: crate::musig2::aggregate_partial_signatures(
1209 deposit_data.get_verifiers(),
1210 None,
1211 agg_nonce,
1212 &musig_sigs_and_nonces,
1213 Message::from_digest(sighash.to_byte_array()),
1214 )?,
1215 sighash_type: bitcoin::TapSighashType::Default,
1216 };
1217
1218 opt_payout_txhandler.set_p2tr_script_spend_witness(&[final_sig.serialize()], 1, 0)?;
1220 let opt_payout_txhandler = opt_payout_txhandler.promote()?;
1221 let opt_payout_tx = opt_payout_txhandler.get_cached_tx();
1222 tracing::info!(
1223 "Optimistic payout transaction created successfully for deposit id: {:?}",
1224 deposit_id
1225 );
1226
1227 #[cfg(feature = "automation")]
1228 {
1229 tracing::info!("Sending optimistic payout tx via tx_sender");
1230
1231 let mut dbtx = self.db.begin_transaction().await?;
1232 self.tx_sender
1233 .add_tx_to_queue(
1234 Some(&mut dbtx),
1235 TransactionType::OptimisticPayout,
1236 opt_payout_tx,
1237 &[],
1238 None,
1239 self.config.protocol_paramset(),
1240 None,
1241 )
1242 .await
1243 .map_to_status()?;
1244 dbtx.commit().await.map_err(|e| {
1245 Status::internal(format!(
1246 "Failed to commit db transaction to send optimistic payout tx: {e}",
1247 ))
1248 })?;
1249 }
1250
1251 Ok(Response::new(RawSignedTx::from(opt_payout_tx)))
1252 } else {
1253 Err(Status::not_found(format!(
1254 "Withdrawal with index {deposit_id} not found."
1255 )))
1256 }
1257 }
1258
1259 async fn internal_send_tx(
1260 &self,
1261 request: Request<clementine::SendTxRequest>,
1262 ) -> Result<Response<Empty>, Status> {
1263 #[cfg(not(feature = "automation"))]
1264 {
1265 Err(Status::unimplemented("Automation is not enabled"))
1266 }
1267 #[cfg(feature = "automation")]
1268 {
1269 let send_tx_req = request.into_inner();
1270 let fee_type = send_tx_req.fee_type();
1271 let signed_tx: bitcoin::Transaction = send_tx_req
1272 .raw_tx
1273 .ok_or(Status::invalid_argument("Missing raw_tx"))?
1274 .try_into()?;
1275 tracing::warn!(
1276 "Internal send tx rpc called with feetype: {:?}, tx hex: {}",
1277 fee_type,
1278 bitcoin::consensus::encode::serialize_hex(&signed_tx)
1279 );
1280
1281 let mut dbtx = self.db.begin_transaction().await?;
1282 self.tx_sender
1283 .insert_try_to_send(
1284 Some(&mut dbtx),
1285 None,
1286 &signed_tx,
1287 fee_type.try_into()?,
1288 None,
1289 &[],
1290 &[],
1291 &[],
1292 &[],
1293 )
1294 .await
1295 .map_to_status()?;
1296 dbtx.commit()
1297 .await
1298 .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
1299 Ok(Response::new(Empty {}))
1300 }
1301 }
1302
1303 #[tracing::instrument(skip_all, err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
1304 async fn setup(
1305 &self,
1306 _request: Request<Empty>,
1307 ) -> Result<Response<VerifierPublicKeys>, Status> {
1308 tracing::info!("Setup rpc called");
1309 self.check_compatibility_with_actors(CompatibilityCheckScope::Both)
1310 .await?;
1311 const CHANNEL_CAPACITY: usize = 1024 * 16;
1313 let (operator_params_tx, operator_params_rx) =
1314 tokio::sync::broadcast::channel(CHANNEL_CAPACITY);
1315 let operator_params_rx_handles = (0..self.get_verifier_clients().len())
1316 .map(|_| operator_params_rx.resubscribe())
1317 .collect::<Vec<_>>();
1318
1319 let operators = self.get_operator_clients().to_vec();
1320 let operator_pks = self.fetch_operator_keys().await?;
1321 let operator_ids = operator_pks
1322 .iter()
1323 .map(|key| OperatorId(*key))
1324 .collect::<Vec<_>>();
1325 let get_operator_params_chunked_handle = tokio::spawn(async move {
1326 tracing::info!(clients = operators.len(), "Collecting operator details...");
1327 try_join_all_combine_errors(operators.iter().zip(operator_ids.iter()).map(
1328 |(operator, id)| {
1329 let mut operator = operator.clone();
1330 let tx = operator_params_tx.clone();
1331 async move {
1332 let stream = operator
1333 .get_params(Request::new(Empty {}))
1334 .await
1335 .wrap_err_with(|| AggregatorError::RequestFailed {
1336 request_name: format!("Operator get params for {id}"),
1337 })
1338 .map_err(BridgeError::from)?
1339 .into_inner();
1340 tx.send(stream.try_collect::<Vec<_>>().await?)
1341 .map_err(|e| {
1342 BridgeError::from(eyre::eyre!(
1343 "Failed to read operator params for {id}: {e}"
1344 ))
1345 })?;
1346 Ok::<_, Status>(())
1347 }
1348 },
1349 ))
1350 .await
1351 .wrap_err("Failed to get operator params from operators")
1352 .map_to_status()?;
1353 Ok::<_, Status>(())
1354 });
1355
1356 let verifiers = self.get_verifier_clients().to_vec();
1357 let verifier_pks = self.fetch_verifier_keys().await?;
1358 let verifier_ids = verifier_pks
1359 .iter()
1360 .map(|key| VerifierId(*key))
1361 .collect::<Vec<_>>();
1362 let set_operator_params_handle = tokio::spawn(async move {
1363 tracing::info!("Informing verifiers of existing operators...");
1364 try_join_all_combine_errors(
1365 verifiers
1366 .iter()
1367 .zip(verifier_ids.iter())
1368 .zip(operator_params_rx_handles)
1369 .map(|((verifier, id), mut rx)| {
1370 let verifier = verifier.clone();
1371 async move {
1372 collect_and_call(&mut rx, |params| {
1373 let mut verifier = verifier.clone();
1374 async move {
1375 verifier
1376 .set_operator(futures::stream::iter(params))
1377 .await
1378 .wrap_err_with(|| AggregatorError::RequestFailed {
1379 request_name: format!("Verifier set_operator for {id}"),
1380 })
1381 .map_err(BridgeError::from)?;
1382 Ok::<_, Status>(())
1383 }
1384 })
1385 .await?;
1386 Ok::<_, Status>(())
1387 }
1388 }),
1389 )
1390 .await
1391 .wrap_err("Failed to set_operator for all verifiers")
1392 .map_to_status()?;
1393 Ok::<_, Status>(())
1394 });
1395
1396 let task_outputs = timed_request(
1397 SETUP_COMPLETION_TIMEOUT,
1398 "Aggregator setup pipeline",
1399 async move {
1400 Ok::<_, BridgeError>(
1401 futures::future::join_all([
1402 get_operator_params_chunked_handle,
1403 set_operator_params_handle,
1404 ])
1405 .await,
1406 )
1407 },
1408 )
1409 .await?;
1410
1411 let task_names = ["Get operator params", "Set operator params"];
1412 debug_assert_eq!(task_names.len(), task_outputs.len());
1413
1414 flatten_join_named_results(task_names.into_iter().zip(task_outputs.into_iter()))?;
1415
1416 Ok(Response::new(VerifierPublicKeys::from(verifier_pks)))
1417 }
1418
1419 async fn new_deposit(
1440 &self,
1441 request: Request<Deposit>,
1442 ) -> Result<Response<clementine::RawSignedTx>, Status> {
1443 tracing::info!("New deposit rpc called");
1444 self.check_compatibility_with_actors(CompatibilityCheckScope::Both)
1445 .await?;
1446
1447 timed_request(OVERALL_DEPOSIT_TIMEOUT, "Overall new deposit", async {
1448 let deposit_info: DepositInfo = request.into_inner().try_into()?;
1449 tracing::info!(
1450 "Parsed new deposit rpc params, deposit info: {:?}",
1451 deposit_info
1452 );
1453
1454 let deposit_data = DepositData {
1455 deposit: deposit_info.clone(),
1456 nofn_xonly_pk: None,
1457 actors: Actors {
1458 verifiers: self.fetch_verifier_keys().await?,
1459 watchtowers: vec![],
1460 operators: self.fetch_operator_keys().await?,
1461 },
1462 security_council: self.config.security_council.clone(),
1463 };
1464 tracing::info!(
1465 "Created deposit data in new_deposit for deposit info: {:?}, deposit data: {:?}",
1466 deposit_info,
1467 deposit_data
1468 );
1469
1470 let deposit_params = deposit_data.clone().into();
1471
1472 let start = std::time::Instant::now();
1474 timed_request(
1475 KEY_DISTRIBUTION_TIMEOUT,
1476 "Key collection and distribution",
1477 self.collect_and_distribute_keys(&deposit_params),
1478 )
1479 .await?;
1480 tracing::info!("Collected and distributed keys in {:?}", start.elapsed());
1481
1482 let verifiers = self.get_participating_verifiers(&deposit_data).await?;
1483 let verifiers_ids = verifiers.ids();
1484
1485 let num_required_sigs = self.config.get_num_required_nofn_sigs(&deposit_data);
1487 let num_required_nonces = num_required_sigs as u32 + 2; let (first_responses, nonce_streams) =
1489 create_nonce_streams(
1490 verifiers.clone(),
1491 num_required_nonces,
1492 #[cfg(test)]
1493 &self.config,
1494 )
1495 .await?;
1496
1497 let deposit_sign_session = DepositSignSession {
1499 deposit_params: Some(deposit_params.clone()),
1500 nonce_gen_first_responses: first_responses,
1501 };
1502
1503 let deposit_sign_param: VerifierDepositSignParams =
1504 deposit_sign_session.clone().into();
1505
1506 #[allow(clippy::unused_enumerate_index)]
1507 let partial_sig_streams = timed_try_join_all(
1508 PARTIAL_SIG_STREAM_CREATION_TIMEOUT,
1509 "Partial signature stream creation",
1510 Some(verifiers.ids()),
1511 verifiers.clients().into_iter().enumerate().map(|(_idx, verifier_client)| {
1512 let mut verifier_client = verifier_client.clone();
1513 #[cfg(test)]
1514 let config = self.config.clone();
1515
1516 let deposit_sign_param =
1517 deposit_sign_param.clone();
1518
1519 async move {
1520 #[cfg(test)]
1521 config
1522 .test_params
1523 .timeout_params
1524 .hook_timeout_partial_sig_stream_creation_verifier(_idx)
1525 .await;
1526
1527 let (tx, rx) = tokio::sync::mpsc::channel(num_required_nonces as usize + 1); let stream = verifier_client
1530 .deposit_sign(tokio_stream::wrappers::ReceiverStream::new(rx))
1531 .await?
1532 .into_inner();
1533
1534 tx.send(deposit_sign_param).await.map_err(|e| {
1535 BridgeError::from(eyre::eyre!("Failed to send deposit sign session: {e:?}"))})?;
1536
1537 Ok::<_, BridgeError>((stream, tx))
1538 }
1539 })
1540 )
1541 .await?;
1542
1543 #[allow(clippy::unused_enumerate_index)]
1545 let deposit_finalize_streams = verifiers.clients().into_iter().enumerate().map(
1546 |(_idx, mut verifier)| {
1547 let (tx, rx) = tokio::sync::mpsc::channel(num_required_nonces as usize + 1);
1548 let receiver_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
1549 #[cfg(test)]
1550 let config = self.config.clone();
1551 let deposit_finalize_future = tokio::spawn(async move {
1553 #[cfg(test)]
1554 config
1555 .test_params
1556 .timeout_params
1557 .hook_timeout_deposit_finalize_verifier(_idx)
1558 .await;
1559
1560 verifier.deposit_finalize(receiver_stream).await
1561 });
1562
1563 Ok::<_, BridgeError>((deposit_finalize_future, tx))
1564 },
1565 ).collect::<Result<Vec<_>, BridgeError>>()?;
1566
1567 tracing::info!("Sending deposit finalize streams to verifiers for deposit {:?}", deposit_info);
1568
1569 let (deposit_finalize_futures, deposit_finalize_sender): (Vec<_>, Vec<_>) =
1570 deposit_finalize_streams.into_iter().unzip();
1571
1572 let deposit_finalize_first_param: VerifierDepositFinalizeParams =
1574 deposit_sign_session.clone().into();
1575
1576 timed_try_join_all(
1577 DEPOSIT_FINALIZE_STREAM_CREATION_TIMEOUT,
1578 "Deposit finalization initial param send",
1579 Some(verifiers.ids()),
1580 deposit_finalize_sender.iter().cloned().map(|tx| {
1581 let param = deposit_finalize_first_param.clone();
1582 async move {
1583 tx.send(param).await
1584 .map_err(|e| {
1585 BridgeError::from(eyre::eyre!(
1586 "Failed to send deposit finalize first param: {e:?}"))
1587 })
1588 }
1589 })
1590 ).await?;
1591
1592
1593 let deposit_blockhash = self
1594 .rpc
1595 .get_blockhash_of_tx(&deposit_data.get_deposit_outpoint().txid)
1596 .await
1597 .map_to_status()?;
1598
1599 let verifiers_public_keys = deposit_data.get_verifiers();
1600
1601 let needed_nofn_sigs = self.config.get_num_required_nofn_sigs(&deposit_data);
1602
1603 let sighash_stream = Box::pin(create_nofn_sighash_stream(
1605 self.db.clone(),
1606 self.config.clone(),
1607 deposit_data.clone(),
1608 deposit_blockhash,
1609 false,
1610 ));
1611
1612 let (agg_nonce_sender, agg_nonce_receiver) = channel(num_required_nonces as usize);
1614 let (partial_sig_sender, partial_sig_receiver) = channel(num_required_nonces as usize);
1615 let (final_sig_sender, final_sig_receiver) = channel(num_required_nonces as usize);
1616
1617 let nonce_agg_handle = tokio::spawn(nonce_aggregator(
1619 nonce_streams,
1620 sighash_stream,
1621 agg_nonce_sender,
1622 needed_nofn_sigs,
1623 verifiers_ids.clone(),
1624 ));
1625
1626 let nonce_dist_handle = tokio::spawn(nonce_distributor(
1628 agg_nonce_receiver,
1629 partial_sig_streams,
1630 partial_sig_sender,
1631 needed_nofn_sigs,
1632 verifiers_ids.clone(),
1633 ));
1634
1635 let sig_agg_handle = tokio::spawn(signature_aggregator(
1637 partial_sig_receiver,
1638 verifiers_public_keys,
1639 final_sig_sender,
1640 needed_nofn_sigs,
1641 ));
1642
1643 tracing::debug!("Getting signatures from operators");
1644 let operators = self.get_participating_operators(&deposit_data).await?;
1646
1647 let config_clone = self.config.clone();
1648 let operator_sigs_fut = tokio::spawn(async move {
1649 timed_request(
1650 OPERATOR_SIGS_TIMEOUT,
1651 "Operator signature collection",
1652 async {
1653 Aggregator::collect_operator_sigs(
1654 operators,
1655 config_clone,
1656 deposit_sign_session,
1657 )
1658 .await
1659 },
1660 )
1661 .await
1662 });
1663
1664 let nonce_agg_handle = nonce_agg_handle
1666 .map_err(|_| Status::internal("panic when aggregating nonces"))
1667 .map(
1668 |res| -> Result<((AggregatedNonce, Vec<PublicNonce>), (AggregatedNonce, Vec<PublicNonce>)), Status> {
1669 res.and_then(|r| r.map_err(Into::into))
1670 },
1671 )
1672 .shared();
1673
1674 let sig_dist_handle = tokio::spawn(signature_distributor(
1676 final_sig_receiver,
1677 deposit_finalize_sender.clone(),
1678 nonce_agg_handle.clone(),
1679 needed_nofn_sigs,
1680 verifiers_ids.clone(),
1681 ));
1682
1683 let all_op_sigs = operator_sigs_fut
1687 .await
1688 .map_err(|_| BridgeError::from(eyre::eyre!("panic when collecting operator signatures")))??;
1689
1690 tracing::info!("Got all operator signatures for deposit {:?}", deposit_info);
1691
1692 let task_outputs = timed_request(
1698 PIPELINE_COMPLETION_TIMEOUT,
1699 "MuSig2 signing pipeline",
1700 async move {
1701 Ok::<_, BridgeError>(futures::future::join_all([nonce_dist_handle, sig_agg_handle, sig_dist_handle]).await)
1702 },
1703 )
1704 .await?;
1705
1706 let task_names = ["Nonce distribution", "Signature aggregation", "Signature distribution"];
1707
1708 debug_assert_eq!(task_names.len(), task_outputs.len());
1709
1710 flatten_join_named_results(
1711 task_names.into_iter().zip(task_outputs.into_iter()),
1712 )?;
1713 tracing::info!("All deposit_sign related tasks completed for deposit {:?}, now sending operator signatures to verifiers for verification", deposit_info);
1714
1715 tracing::debug!("Pipeline tasks completed");
1716 let verifiers_ids = verifiers.ids();
1717
1718 let deposit_finalize_futures = timed_request(
1720 SEND_OPERATOR_SIGS_TIMEOUT,
1721 "Sending operator signatures to verifiers",
1722 async {
1723 let send_operator_sigs: Vec<_> = deposit_finalize_sender
1724 .iter()
1725 .zip(verifiers_ids.iter())
1726 .zip(deposit_finalize_futures.into_iter())
1727 .map(|((tx, id), dep_fin_fut)| async {
1728 for one_op_sigs in all_op_sigs.iter() {
1729 for sig in one_op_sigs.iter() {
1730 let deposit_finalize_param: VerifierDepositFinalizeParams =
1731 sig.into();
1732
1733 let send = tx.send(deposit_finalize_param).await;
1734 match send {
1735 Ok(()) => (),
1736 Err(e) => {
1737 dep_fin_fut.await.wrap_err(format!("{} deposit finalize tokio task on aggregator returned error", id.clone()))?.wrap_err(format!("{} deposit finalize rpc call returned error", id.clone()))?;
1739 return Err(BridgeError::from(eyre::eyre!(format!("{} deposit finalize stream sending returned error: {:?}", id.clone(), e))));
1740 }
1741 }
1742 }
1743 }
1744
1745 Ok::<_, BridgeError>(dep_fin_fut)
1746 })
1747 .collect();
1748 try_join_all_combine_errors(send_operator_sigs).await
1749 },
1750 )
1751 .await.wrap_err("Failed to send operator signatures to verifiers")?;
1752
1753 tracing::info!("All operator signatures sent to verifiers for verification, now waiting to collect movetx and emergency stop tx partial signatures from verifiers for deposit {:?}", deposit_info);
1754
1755 let partial_sigs: Vec<(Vec<u8>, Vec<u8>)> = timed_try_join_all(
1757 DEPOSIT_FINALIZATION_TIMEOUT,
1758 "Deposit finalization",
1759 Some(verifiers.ids()),
1760 deposit_finalize_futures.into_iter().map(|fut| async move {
1761 let inner = fut.await
1762 .map_err(|_| BridgeError::from(eyre::eyre!("panic finishing deposit_finalize")))??
1763 .into_inner();
1764 Ok((inner.move_to_vault_partial_sig, inner.emergency_stop_partial_sig))
1765 }),
1766 )
1767 .await?;
1768
1769
1770 let (move_to_vault_sigs, emergency_stop_sigs): (Vec<Vec<u8>>, Vec<Vec<u8>>) =
1771 partial_sigs.into_iter().unzip();
1772
1773 tracing::info!("Received move tx and emergency stop tx partial signatures for deposit {:?}", deposit_info);
1774
1775 let (movetx_agg_nonce, emergency_stop_agg_nonce) = nonce_agg_handle.await?;
1777
1778 self.verify_and_save_emergency_stop_sigs(
1780 emergency_stop_sigs,
1781 emergency_stop_agg_nonce,
1782 deposit_params.clone(),
1783 )
1784 .await?;
1785
1786 let signed_movetx_handler = self
1787 .create_movetx(move_to_vault_sigs, movetx_agg_nonce, deposit_params)
1788 .await?;
1789
1790 let raw_signed_tx = RawSignedTx {
1791 raw_tx: bitcoin::consensus::serialize(&signed_movetx_handler.get_cached_tx()),
1792 };
1793
1794 tracing::info!("Created final move transaction for deposit {:?}", deposit_info);
1795
1796 Ok(Response::new(raw_signed_tx))
1797 })
1798 .await.map_err(Into::into)
1799 }
1800
1801 #[tracing::instrument(skip(self), err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE))]
1802 async fn withdraw(
1803 &self,
1804 request: Request<AggregatorWithdrawalInput>,
1805 ) -> Result<Response<AggregatorWithdrawResponse>, Status> {
1806 tracing::warn!("Withdraw rpc called");
1807 let request = request.into_inner();
1808 let (withdraw_params_with_sig, operator_xonly_pks) = (
1809 request.withdrawal.ok_or(Status::invalid_argument(
1810 "withdrawalParamsWithSig is missing",
1811 ))?,
1812 request.operator_xonly_pks,
1813 );
1814 self.check_compatibility_with_actors(CompatibilityCheckScope::OperatorsOnly)
1816 .await?;
1817
1818 let withdraw_params = withdraw_params_with_sig
1819 .clone()
1820 .withdrawal
1821 .ok_or(Status::invalid_argument("withdrawalParams is missing"))?;
1822
1823 let operator_xonly_pks_from_rpc: Vec<XOnlyPublicKey> = operator_xonly_pks
1825 .into_iter()
1826 .map(|xonly_pk| {
1827 xonly_pk.try_into().map_err(|e| {
1828 Status::invalid_argument(format!("Failed to convert xonly public key: {e}"))
1829 })
1830 })
1831 .collect::<Result<Vec<_>, Status>>()?;
1832
1833 tracing::warn!(
1834 "Parsed withdraw rpc params, withdrawal params: {:?}, operator xonly pks: {:?}",
1835 withdraw_params,
1836 operator_xonly_pks_from_rpc
1837 .iter()
1838 .map(|pk| pk.to_string())
1839 .collect::<Vec<_>>()
1840 );
1841
1842 let (withdrawal_id, _, _, _, _) =
1845 parser::operator::parse_withdrawal_sig_params(withdraw_params)?;
1846
1847 let current_operator_xonly_pks = self.fetch_operator_keys().await?;
1850 let invalid_operator_xonly_pks = operator_xonly_pks_from_rpc
1851 .iter()
1852 .filter(|xonly_pk| !current_operator_xonly_pks.contains(xonly_pk))
1853 .collect::<Vec<_>>();
1854 if !invalid_operator_xonly_pks.is_empty() {
1855 return Err(Status::invalid_argument(format!(
1856 "Given xonly public key doesn't belong to any current operator: invalid keys: {invalid_operator_xonly_pks:?}, current operators: {current_operator_xonly_pks:?}"
1857 )));
1858 }
1859
1860 let operators = self
1861 .get_operator_clients()
1862 .iter()
1863 .zip(current_operator_xonly_pks.into_iter());
1864 let withdraw_futures = operators
1865 .filter(|(_, xonly_pk)| {
1866 operator_xonly_pks_from_rpc.is_empty()
1868 || operator_xonly_pks_from_rpc.contains(xonly_pk)
1869 })
1870 .map(|(operator, operator_xonly_pk)| {
1871 let mut operator = operator.clone();
1872 let params = withdraw_params_with_sig.clone();
1873 let mut request = Request::new(params);
1874 request.set_timeout(WITHDRAWAL_TIMEOUT);
1875 async move { (operator.withdraw(request).await, operator_xonly_pk) }
1876 });
1877
1878 let responses = futures::future::join_all(withdraw_futures).await;
1880 tracing::warn!(
1881 "Withdraw rpc completed successfully for withdrawal id: {}, operator xonly pks: {:?}, responses: {:?}",
1882 withdrawal_id,
1883 operator_xonly_pks_from_rpc
1884 .iter()
1885 .map(|pk| pk.to_string())
1886 .collect::<Vec<_>>(),
1887 responses,
1888 );
1889 Ok(Response::new(AggregatorWithdrawResponse {
1890 withdraw_responses: responses
1891 .into_iter()
1892 .map(|(res, xonly_pk)| match res {
1893 Ok(withdraw_response) => OperatorWithrawalResponse {
1894 operator_xonly_pk: Some(xonly_pk.into()),
1895 response: Some(operator_withrawal_response::Response::RawTx(
1896 withdraw_response.into_inner(),
1897 )),
1898 },
1899 Err(e) => OperatorWithrawalResponse {
1900 operator_xonly_pk: Some(xonly_pk.into()),
1901 response: Some(operator_withrawal_response::Response::Error(e.to_string())),
1902 },
1903 })
1904 .collect(),
1905 }))
1906 }
1907
1908 async fn get_nofn_aggregated_xonly_pk(
1909 &self,
1910 _: tonic::Request<super::Empty>,
1911 ) -> std::result::Result<tonic::Response<super::NofnResponse>, tonic::Status> {
1912 tracing::info!("Get nofn aggregated xonly pk rpc called");
1913 let verifier_keys = self.fetch_verifier_keys().await?;
1914 let num_verifiers = verifier_keys.len();
1915 let nofn_xonly_pk = bitcoin::XOnlyPublicKey::from_musig2_pks(verifier_keys.clone(), None)
1916 .map_err(|e| {
1917 Status::internal(format!(
1918 "Failed to aggregate verifier public keys, err: {e}, pubkeys: {verifier_keys:?}"
1919 ))
1920 })?;
1921 Ok(Response::new(super::NofnResponse {
1922 nofn_xonly_pk: nofn_xonly_pk.serialize().to_vec(),
1923 num_verifiers: num_verifiers as u32,
1924 }))
1925 }
1926
1927 async fn internal_get_emergency_stop_tx(
1928 &self,
1929 request: Request<clementine::GetEmergencyStopTxRequest>,
1930 ) -> Result<Response<clementine::GetEmergencyStopTxResponse>, Status> {
1931 tracing::warn!("Get emergency stop tx rpc called");
1932 let inner_request = request.into_inner();
1933 let txids: Vec<Txid> = inner_request
1934 .txids
1935 .into_iter()
1936 .map(|txid| {
1937 Txid::from_slice(&txid.txid).map_err(|e| {
1938 tonic::Status::invalid_argument(format!("Failed to parse txid: {e}"))
1939 })
1940 })
1941 .collect::<Result<Vec<_>, _>>()?;
1942 tracing::warn!(
1943 "Parsed get emergency stop tx rpc params, move txids: {:?}",
1944 txids
1945 .iter()
1946 .map(|txid| txid.to_string())
1947 .collect::<Vec<_>>()
1948 );
1949
1950 let emergency_stop_txs = self.db.get_emergency_stop_txs(None, txids).await?;
1951
1952 let (txids, encrypted_emergency_stop_txs): (Vec<Txid>, Vec<Vec<u8>>) =
1953 emergency_stop_txs.into_iter().unzip();
1954
1955 Ok(Response::new(clementine::GetEmergencyStopTxResponse {
1956 txids: txids.into_iter().map(|txid| txid.into()).collect(),
1957 encrypted_emergency_stop_txs,
1958 }))
1959 }
1960
1961 async fn send_move_to_vault_tx(
1962 &self,
1963 request: Request<clementine::SendMoveTxRequest>,
1964 ) -> Result<Response<clementine::Txid>, Status> {
1965 tracing::info!("Send move to vault tx rpc called");
1966 #[cfg(not(feature = "automation"))]
1967 {
1968 let _ = request;
1969 return Err(Status::unimplemented(
1970 "Automation is disabled, cannot automatically send move to vault tx.",
1971 ));
1972 }
1973
1974 #[cfg(feature = "automation")]
1975 {
1976 use bitcoin::Amount;
1977 use std::sync::Arc;
1978
1979 use crate::builder::{
1980 address::create_taproot_address,
1981 script::{CheckSig, Multisig, SpendableScript},
1982 transaction::anchor_output,
1983 };
1984
1985 let request = request.into_inner();
1986 let movetx: bitcoin::Transaction = bitcoin::consensus::deserialize(
1987 &request
1988 .raw_tx
1989 .ok_or_eyre("raw_tx is required")
1990 .map_to_status()?
1991 .raw_tx,
1992 )
1993 .wrap_err("Failed to deserialize movetx")
1994 .map_to_status()?;
1995 let deposit_outpoint: bitcoin::OutPoint = request
1996 .deposit_outpoint
1997 .ok_or(Status::invalid_argument("deposit_outpoint is required"))?
1998 .try_into()?;
1999
2000 tracing::info!(
2001 "Parsed send move to vault tx rpc params, deposit outpoint: {:?}, movetx hex: {}",
2002 deposit_outpoint,
2003 bitcoin::consensus::encode::serialize_hex(&movetx)
2004 );
2005
2006 if movetx.input.len() != 1 || movetx.output.len() != 2 {
2008 return Err(Status::invalid_argument(
2009 "Transaction is not a movetx, input or output lengths are not correct",
2010 ));
2011 }
2012 if !(movetx.output[0].value == self.config.protocol_paramset().bridge_amount
2015 && movetx.output[1].value == Amount::from_sat(0))
2016 {
2017 return Err(Status::invalid_argument(format!(
2018 "Transaction is not a movetx, output sat values are not correct, should be ({}, 0), got ({}, {})",
2019 self.config.protocol_paramset().bridge_amount,
2020 movetx.output[0].value,
2021 movetx.output[1].value,
2022 )));
2023 }
2024 let verifier_keys = self.fetch_verifier_keys().await?;
2026 let nofn_xonly_pk =
2027 bitcoin::XOnlyPublicKey::from_musig2_pks(verifier_keys.clone(), None).map_err(
2028 |e| {
2029 Status::internal(format!(
2030 "Failed to aggregate verifier public keys, err: {e}, pubkeys: {verifier_keys:?}"
2031 ))
2032 },
2033 )?;
2034 let nofn_script = Arc::new(CheckSig::new(nofn_xonly_pk));
2035 let security_council_script = Arc::new(Multisig::from_security_council(
2036 self.config.security_council.clone(),
2037 ));
2038
2039 let (addr, _) = create_taproot_address(
2040 &[
2041 nofn_script.to_script_buf(),
2042 security_council_script.to_script_buf(),
2043 ],
2044 None,
2045 self.config.protocol_paramset().network,
2046 );
2047 let bridge_script_pubkey = addr.script_pubkey();
2048
2049 if !(movetx.output[1].script_pubkey
2050 == anchor_output(self.config.protocol_paramset().anchor_amount()).script_pubkey
2051 && movetx.output[0].script_pubkey == bridge_script_pubkey)
2052 {
2053 return Err(Status::invalid_argument(
2054 format!("Transaction is not a movetx, output scriptpubkeys are not correct, expected: (vault: {:?}, anchor: {:?}), got: (vault: {:?}, anchor: {:?})",
2055 bridge_script_pubkey,
2056 anchor_output(self.config.protocol_paramset().anchor_amount()).script_pubkey,
2057 movetx.output[0].script_pubkey,
2058 movetx.output[1].script_pubkey,
2059 )));
2060 }
2061
2062 let mut dbtx = self.db.begin_transaction().await?;
2063 self.tx_sender
2064 .insert_try_to_send(
2065 Some(&mut dbtx),
2066 Some(TxMetadata {
2067 deposit_outpoint: Some(deposit_outpoint),
2068 operator_xonly_pk: None,
2069 round_idx: None,
2070 kickoff_idx: None,
2071 tx_type: TransactionType::MoveToVault,
2072 }),
2073 &movetx,
2074 FeePayingType::CPFP,
2075 None,
2076 &[],
2077 &[],
2078 &[],
2079 &[],
2080 )
2081 .await
2082 .map_to_status()?;
2083 dbtx.commit()
2084 .await
2085 .map_err(|e| Status::internal(format!("Failed to commit db transaction: {e}")))?;
2086
2087 Ok(Response::new(movetx.compute_txid().into()))
2088 }
2089 }
2090}
2091
2092#[cfg(test)]
2093mod tests {
2094 use crate::actor::Actor;
2095 use crate::builder;
2096 use crate::config::BridgeConfig;
2097 use crate::deposit::{BaseDepositData, DepositInfo, DepositType};
2098 use crate::musig2::AggregateFromPublicKeys;
2099 use crate::rpc::clementine::clementine_aggregator_client::ClementineAggregatorClient;
2100 use crate::rpc::clementine::{self, GetEntityStatusesRequest, SendMoveTxRequest};
2101 use crate::rpc::get_clients;
2102 use crate::servers::create_aggregator_unix_server;
2103 use crate::test::common::citrea::MockCitreaClient;
2104 use crate::test::common::tx_utils::ensure_tx_onchain;
2105 use crate::test::common::*;
2106 use bitcoin::hashes::Hash;
2107 use bitcoincore_rpc::RpcApi;
2108 use clementine_primitives::EVMAddress;
2109 use eyre::Context;
2110 use std::time::Duration;
2111 use tokio::time::sleep;
2112 use tonic::{Request, Status};
2113
2114 #[cfg(feature = "automation")]
2115 async fn perform_deposit(mut config: BridgeConfig) -> Result<(), Status> {
2116 let regtest = create_regtest_rpc(&mut config).await;
2117 let rpc = regtest.rpc();
2118
2119 let actors = create_actors::<MockCitreaClient>(&config).await;
2120 let _unused =
2121 run_single_deposit::<MockCitreaClient>(&mut config, rpc.clone(), None, &actors, None)
2122 .await?;
2123
2124 Ok(())
2125 }
2126
2127 #[tokio::test(flavor = "multi_thread")]
2128 async fn aggregator_double_deposit() {
2129 let mut config = create_test_config_with_thread_name().await;
2130 let regtest = create_regtest_rpc(&mut config).await;
2131 let rpc = regtest.rpc();
2132 let actors = create_actors::<MockCitreaClient>(&config).await;
2133 let mut aggregator = actors.get_aggregator();
2134
2135 let evm_address = EVMAddress([1u8; 20]);
2136 let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2137
2138 let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2139 .setup(tonic::Request::new(clementine::Empty {}))
2140 .await
2141 .unwrap()
2142 .into_inner()
2143 .try_into()
2144 .unwrap();
2145 sleep(Duration::from_secs(3)).await;
2146
2147 let nofn_xonly_pk =
2148 bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2149
2150 let deposit_address = builder::address::generate_deposit_address(
2151 nofn_xonly_pk,
2152 signer.address.as_unchecked(),
2153 evm_address,
2154 config.protocol_paramset().network,
2155 config.protocol_paramset().user_takes_after,
2156 )
2157 .unwrap()
2158 .0;
2159
2160 let deposit_outpoint = rpc
2161 .send_to_address(&deposit_address, config.protocol_paramset().bridge_amount)
2162 .await
2163 .unwrap();
2164 rpc.mine_blocks(18).await.unwrap();
2165
2166 let deposit_info = DepositInfo {
2167 deposit_outpoint,
2168 deposit_type: DepositType::BaseDeposit(BaseDepositData {
2169 evm_address,
2170 recovery_taproot_address: signer.address.as_unchecked().clone(),
2171 }),
2172 };
2173
2174 let movetx_one = aggregator
2176 .new_deposit(clementine::Deposit::from(deposit_info.clone()))
2177 .await
2178 .unwrap()
2179 .into_inner();
2180 let movetx_one_txid: bitcoin::Txid = aggregator
2181 .send_move_to_vault_tx(SendMoveTxRequest {
2182 deposit_outpoint: Some(deposit_outpoint.into()),
2183 raw_tx: Some(movetx_one),
2184 })
2185 .await
2186 .unwrap()
2187 .into_inner()
2188 .try_into()
2189 .unwrap();
2190
2191 let movetx_two = aggregator
2192 .new_deposit(clementine::Deposit::from(deposit_info))
2193 .await
2194 .unwrap()
2195 .into_inner();
2196 let _movetx_two_txid: bitcoin::Txid = aggregator
2197 .send_move_to_vault_tx(SendMoveTxRequest {
2198 deposit_outpoint: Some(deposit_outpoint.into()),
2199 raw_tx: Some(movetx_two),
2200 })
2201 .await
2202 .unwrap()
2203 .into_inner()
2204 .try_into()
2205 .unwrap();
2206 rpc.mine_blocks(1).await.unwrap();
2207 sleep(Duration::from_secs(3)).await;
2208
2209 poll_until_condition(
2210 async || {
2211 rpc.mine_blocks(1).await.unwrap();
2212 Ok(rpc
2213 .is_tx_on_chain(&movetx_one_txid)
2214 .await
2215 .unwrap_or_default())
2216 },
2217 None,
2218 None,
2219 )
2220 .await
2221 .wrap_err_with(|| eyre::eyre!("MoveTx did not land onchain"))
2222 .unwrap();
2223 }
2224
2225 #[tokio::test(flavor = "multi_thread")]
2226 async fn aggregator_deposit_movetx_lands_onchain() {
2227 let mut config = create_test_config_with_thread_name().await;
2228 let regtest = create_regtest_rpc(&mut config).await;
2229 let rpc = regtest.rpc();
2230 let actors = create_actors::<MockCitreaClient>(&config).await;
2231 let mut aggregator = actors.get_aggregator();
2232
2233 let evm_address = EVMAddress([1u8; 20]);
2234 let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2235
2236 let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2237 .setup(tonic::Request::new(clementine::Empty {}))
2238 .await
2239 .unwrap()
2240 .into_inner()
2241 .try_into()
2242 .unwrap();
2243 sleep(Duration::from_secs(3)).await;
2244
2245 let nofn_xonly_pk =
2246 bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2247
2248 let deposit_address = builder::address::generate_deposit_address(
2249 nofn_xonly_pk,
2250 signer.address.as_unchecked(),
2251 evm_address,
2252 config.protocol_paramset().network,
2253 config.protocol_paramset().user_takes_after,
2254 )
2255 .unwrap()
2256 .0;
2257
2258 let deposit_outpoint = rpc
2259 .send_to_address(&deposit_address, config.protocol_paramset().bridge_amount)
2260 .await
2261 .unwrap();
2262 rpc.mine_blocks(18).await.unwrap();
2263
2264 let deposit_info = DepositInfo {
2265 deposit_outpoint,
2266 deposit_type: DepositType::BaseDeposit(BaseDepositData {
2267 evm_address,
2268 recovery_taproot_address: signer.address.as_unchecked().clone(),
2269 }),
2270 };
2271
2272 let start_time = std::time::Instant::now();
2274 let raw_move_tx = aggregator
2275 .new_deposit(clementine::Deposit::from(deposit_info))
2276 .await
2277 .unwrap()
2278 .into_inner();
2279 let end_time = std::time::Instant::now();
2280 tracing::info!("New deposit time: {:?}", end_time - start_time);
2281
2282 let movetx_txid = aggregator
2283 .send_move_to_vault_tx(SendMoveTxRequest {
2284 deposit_outpoint: Some(deposit_outpoint.into()),
2285 raw_tx: Some(raw_move_tx),
2286 })
2287 .await
2288 .unwrap()
2289 .into_inner()
2290 .try_into()
2291 .unwrap();
2292
2293 rpc.mine_blocks(1).await.unwrap();
2294 sleep(Duration::from_secs(3)).await;
2295
2296 poll_until_condition(
2297 async || {
2298 rpc.mine_blocks(1).await.unwrap();
2299 Ok(rpc.is_tx_on_chain(&movetx_txid).await.unwrap_or_default())
2300 },
2301 None,
2302 None,
2303 )
2304 .await
2305 .wrap_err_with(|| eyre::eyre!("MoveTx did not land onchain"))
2306 .unwrap();
2307 }
2308
2309 #[tokio::test]
2310 async fn aggregator_two_deposit_movetx_and_emergency_stop() {
2311 let mut config = create_test_config_with_thread_name().await;
2312 let regtest = create_regtest_rpc(&mut config).await;
2313 let rpc = regtest.rpc();
2314 let actors = create_actors::<MockCitreaClient>(&config).await;
2315 let mut aggregator = actors.get_aggregator();
2316
2317 let evm_address = EVMAddress([1u8; 20]);
2318 let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
2319
2320 let verifiers_public_keys: Vec<bitcoin::secp256k1::PublicKey> = aggregator
2321 .setup(tonic::Request::new(clementine::Empty {}))
2322 .await
2323 .unwrap()
2324 .into_inner()
2325 .try_into()
2326 .unwrap();
2327 sleep(Duration::from_secs(3)).await;
2328
2329 let nofn_xonly_pk =
2330 bitcoin::XOnlyPublicKey::from_musig2_pks(verifiers_public_keys.clone(), None).unwrap();
2331
2332 let deposit_address_0 = builder::address::generate_deposit_address(
2333 nofn_xonly_pk,
2334 signer.address.as_unchecked(),
2335 evm_address,
2336 config.protocol_paramset().network,
2337 config.protocol_paramset().user_takes_after,
2338 )
2339 .unwrap()
2340 .0;
2341
2342 let deposit_address_1 = builder::address::generate_deposit_address(
2343 nofn_xonly_pk,
2344 signer.address.as_unchecked(),
2345 evm_address,
2346 config.protocol_paramset().network,
2347 config.protocol_paramset().user_takes_after,
2348 )
2349 .unwrap()
2350 .0;
2351
2352 let deposit_outpoint_0 = rpc
2353 .send_to_address(&deposit_address_0, config.protocol_paramset().bridge_amount)
2354 .await
2355 .unwrap();
2356 rpc.mine_blocks(18).await.unwrap();
2357
2358 let deposit_outpoint_1 = rpc
2359 .send_to_address(&deposit_address_1, config.protocol_paramset().bridge_amount)
2360 .await
2361 .unwrap();
2362 rpc.mine_blocks(18).await.unwrap();
2363
2364 let deposit_info_0 = DepositInfo {
2365 deposit_outpoint: deposit_outpoint_0,
2366 deposit_type: DepositType::BaseDeposit(BaseDepositData {
2367 evm_address,
2368 recovery_taproot_address: signer.address.as_unchecked().clone(),
2369 }),
2370 };
2371
2372 let deposit_info_1 = DepositInfo {
2373 deposit_outpoint: deposit_outpoint_1,
2374 deposit_type: DepositType::BaseDeposit(BaseDepositData {
2375 evm_address,
2376 recovery_taproot_address: signer.address.as_unchecked().clone(),
2377 }),
2378 };
2379
2380 let raw_move_tx_0 = aggregator
2382 .new_deposit(clementine::Deposit::from(deposit_info_0))
2383 .await
2384 .unwrap()
2385 .into_inner();
2386 let move_txid_0: bitcoin::Txid = aggregator
2387 .send_move_to_vault_tx(SendMoveTxRequest {
2388 deposit_outpoint: Some(deposit_outpoint_0.into()),
2389 raw_tx: Some(raw_move_tx_0),
2390 })
2391 .await
2392 .unwrap()
2393 .into_inner()
2394 .try_into()
2395 .unwrap();
2396
2397 rpc.mine_blocks(1).await.unwrap();
2398 sleep(Duration::from_secs(3)).await;
2399 ensure_tx_onchain(rpc, move_txid_0)
2400 .await
2401 .expect("failed to get movetx_0 on chain");
2402
2403 let raw_move_tx_1 = aggregator
2405 .new_deposit(clementine::Deposit::from(deposit_info_1))
2406 .await
2407 .unwrap()
2408 .into_inner();
2409 let move_txid_1 = aggregator
2410 .send_move_to_vault_tx(SendMoveTxRequest {
2411 deposit_outpoint: Some(deposit_outpoint_1.into()),
2412 raw_tx: Some(raw_move_tx_1),
2413 })
2414 .await
2415 .unwrap()
2416 .into_inner()
2417 .try_into()
2418 .unwrap();
2419
2420 rpc.mine_blocks(1).await.unwrap();
2421 ensure_tx_onchain(rpc, move_txid_1)
2422 .await
2423 .expect("failed to get movetx_1 on chain");
2424 sleep(Duration::from_secs(3)).await;
2425
2426 let move_txids = vec![move_txid_0, move_txid_1];
2427
2428 tracing::debug!("Move txids: {:?}", move_txids);
2429
2430 let emergency_txid = aggregator
2431 .internal_get_emergency_stop_tx(tonic::Request::new(
2432 clementine::GetEmergencyStopTxRequest {
2433 txids: move_txids
2434 .iter()
2435 .map(|txid| clementine::Txid {
2436 txid: txid.to_byte_array().to_vec(),
2437 })
2438 .collect(),
2439 },
2440 ))
2441 .await
2442 .unwrap()
2443 .into_inner();
2444
2445 let decryption_priv_key =
2446 hex::decode("a80bc8cf095c2b37d4c6233114e0dd91f43d75de5602466232dbfcc1fc66c542")
2447 .expect("Failed to parse emergency stop encryption public key");
2448 let emergency_stop_tx: bitcoin::Transaction = bitcoin::consensus::deserialize(
2449 &crate::encryption::decrypt_bytes(
2450 &decryption_priv_key,
2451 &emergency_txid.encrypted_emergency_stop_txs[0],
2452 )
2453 .expect("Failed to decrypt emergency stop tx"),
2454 )
2455 .expect("Failed to deserialize");
2456
2457 rpc.send_raw_transaction(&emergency_stop_tx)
2458 .await
2459 .expect("Failed to send emergency stop tx");
2460
2461 let emergency_stop_txid = emergency_stop_tx.compute_txid();
2462 rpc.mine_blocks(1).await.unwrap();
2463
2464 poll_until_condition(
2465 async || {
2466 rpc.mine_blocks(1).await.unwrap();
2467 Ok(rpc
2468 .is_tx_on_chain(&emergency_stop_txid)
2469 .await
2470 .unwrap_or_default())
2471 },
2472 None,
2473 None,
2474 )
2475 .await
2476 .wrap_err_with(|| eyre::eyre!("Emergency stop tx did not land onchain"))
2477 .unwrap();
2478 }
2479
2480 #[cfg(feature = "automation")]
2481 #[tokio::test]
2482 #[ignore = "This test does not work"]
2483 async fn aggregator_deposit_finalize_verifier_timeout() {
2484 let mut config = create_test_config_with_thread_name().await;
2485 config
2486 .test_params
2487 .timeout_params
2488 .deposit_finalize_verifier_idx = Some(0);
2489 let res = perform_deposit(config).await;
2490 assert!(res.is_err());
2491 let err_string = res.unwrap_err().to_string();
2492 assert!(
2493 err_string.contains("Deposit finalization from verifiers"),
2494 "Error string was: {err_string}"
2495 );
2496 }
2497
2498 #[cfg(feature = "automation")]
2499 #[tokio::test]
2500 async fn aggregator_deposit_key_distribution_verifier_timeout() {
2501 let mut config = create_test_config_with_thread_name().await;
2502 config
2503 .test_params
2504 .timeout_params
2505 .key_distribution_verifier_idx = Some(0);
2506
2507 let res = perform_deposit(config).await;
2508
2509 assert!(res.is_err());
2510 let err_string = res.unwrap_err().to_string();
2511 assert!(
2512 err_string.contains("Verifier key distribution (id:"),
2513 "Error string was: {err_string}"
2514 );
2515 }
2516
2517 #[cfg(feature = "automation")]
2518 #[tokio::test]
2519 async fn aggregator_deposit_key_distribution_operator_timeout() {
2520 let mut config = create_test_config_with_thread_name().await;
2521 config
2522 .test_params
2523 .timeout_params
2524 .key_collection_operator_idx = Some(0);
2525
2526 let res = perform_deposit(config).await;
2527
2528 assert!(res.is_err());
2529 let err_string = res.unwrap_err().to_string();
2530 assert!(
2531 err_string.contains("Operator key collection (id:"),
2532 "Error string was: {err_string}"
2533 );
2534 }
2535
2536 #[cfg(feature = "automation")]
2537 #[tokio::test]
2538 async fn aggregator_deposit_nonce_stream_creation_verifier_timeout() {
2539 let mut config = create_test_config_with_thread_name().await;
2540 config
2541 .test_params
2542 .timeout_params
2543 .nonce_stream_creation_verifier_idx = Some(0);
2544
2545 let res = perform_deposit(config).await;
2546
2547 assert!(res.is_err());
2548 let err_string = res.unwrap_err().to_string();
2549 assert!(
2550 err_string.contains("Nonce stream creation (id:"),
2551 "Error string was: {err_string}"
2552 );
2553 }
2554
2555 #[cfg(feature = "automation")]
2556 #[tokio::test]
2557 async fn aggregator_deposit_partial_sig_stream_creation_timeout() {
2558 let mut config = create_test_config_with_thread_name().await;
2559 config
2560 .test_params
2561 .timeout_params
2562 .partial_sig_stream_creation_verifier_idx = Some(0);
2563
2564 let res = perform_deposit(config).await;
2565
2566 assert!(res.is_err());
2567 let err_string = res.unwrap_err().to_string();
2568 assert!(
2569 err_string.contains("Partial signature stream creation (id:"),
2570 "Error string was: {err_string}"
2571 );
2572 }
2573
2574 #[cfg(feature = "automation")]
2575 #[tokio::test]
2576 async fn aggregator_deposit_operator_sig_collection_operator_timeout() {
2577 let mut config = create_test_config_with_thread_name().await;
2578 config
2579 .test_params
2580 .timeout_params
2581 .operator_sig_collection_operator_idx = Some(0);
2582
2583 let res = perform_deposit(config).await;
2584
2585 assert!(res.is_err());
2586 let err_string = res.unwrap_err().to_string();
2587 assert!(
2588 err_string.contains("Operator signature stream creation (id:"),
2589 "Error string was: {err_string}"
2590 );
2591 }
2592
2593 #[tokio::test]
2594 async fn aggregator_get_entity_statuses() {
2595 let mut config = create_test_config_with_thread_name().await;
2596 let _regtest = create_regtest_rpc(&mut config).await;
2597
2598 let actors = create_actors::<MockCitreaClient>(&config).await;
2599 let mut aggregator = actors.get_aggregator();
2600 let status = aggregator
2601 .get_entity_statuses(Request::new(GetEntityStatusesRequest {
2602 restart_tasks: false,
2603 }))
2604 .await
2605 .unwrap()
2606 .into_inner();
2607
2608 tracing::info!("Status: {:?}", status);
2609
2610 assert_eq!(
2611 status.entity_statuses.len(),
2612 config.test_params.all_operators_secret_keys.len()
2613 + config.test_params.all_verifiers_secret_keys.len()
2614 );
2615 }
2616
2617 #[tokio::test]
2618 async fn aggregator_start_with_offline_verifier() {
2619 let mut config = create_test_config_with_thread_name().await;
2620 let _regtest = create_regtest_rpc(&mut config).await;
2622 config.verifier_endpoints = Some(vec!["https://142.143.144.145:17001".to_string()]);
2624 config.operator_endpoints = Some(vec!["https://142.143.144.145:17002".to_string()]);
2625 let socket_dir = tempfile::tempdir().unwrap();
2627 let socket_path = socket_dir.path().join("aggregator.sock");
2628
2629 tracing::info!("Creating unix aggregator server");
2630
2631 let (_, _shutdown_tx) = create_aggregator_unix_server(config.clone(), socket_path.clone())
2632 .await
2633 .unwrap();
2634
2635 tracing::info!("Created unix aggregator server");
2636
2637 let mut aggregator_client = get_clients(
2638 vec![format!("unix://{}", socket_path.display())],
2639 ClementineAggregatorClient::new,
2640 &config,
2641 false,
2642 )
2643 .await
2644 .unwrap()
2645 .pop()
2646 .unwrap();
2647
2648 tracing::info!("Got aggregator client");
2649
2650 assert!(aggregator_client
2652 .vergen(Request::new(clementine::Empty {}))
2653 .await
2654 .is_ok());
2655
2656 tracing::info!("After vergen");
2657
2658 assert!(aggregator_client
2660 .setup(Request::new(clementine::Empty {}))
2661 .await
2662 .is_err());
2663
2664 tracing::info!("After setup");
2665
2666 tracing::info!(
2669 "Entity statuses: {:?}",
2670 aggregator_client
2671 .get_entity_statuses(Request::new(GetEntityStatusesRequest {
2672 restart_tasks: false,
2673 }))
2674 .await
2675 .unwrap()
2676 );
2677 }
2678}