1use std::collections::HashSet;
2use std::ops::Deref;
3use std::sync::Arc;
4
5use crate::compatibility::{ActorWithConfig, CompatibilityParams};
6use crate::constants::{
7 ENTITY_COMP_DATA_POLL_TIMEOUT, ENTITY_STATUS_POLL_TIMEOUT, OPERATOR_GET_KEYS_TIMEOUT,
8 PUBLIC_KEY_COLLECTION_TIMEOUT, RESTART_BACKGROUND_TASKS_TIMEOUT, VERIFIER_SEND_KEYS_TIMEOUT,
9};
10use crate::deposit::DepositData;
11use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
12use crate::rpc::clementine::entity_data_with_id::DataResult;
13use crate::rpc::clementine::entity_status_with_id::StatusResult;
14use crate::rpc::clementine::{
15 self, CompatibilityParamsRpc, DepositParams, Empty, EntityStatusWithId, EntityType,
16 OperatorKeysWithDeposit,
17};
18use crate::rpc::clementine::{EntityDataWithId, EntityId as RPCEntityId};
19use crate::task::aggregator_metric_publisher::AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY;
20use crate::task::TaskExt;
21#[cfg(feature = "automation")]
22use crate::tx_sender::TxSenderClient;
23use crate::utils::{
24 flatten_join_named_results, join_all_partition_results, timed_request, timed_try_join_all,
25};
26use crate::{
27 config::BridgeConfig,
28 database::Database,
29 rpc::{
30 self,
31 clementine::{
32 clementine_operator_client::ClementineOperatorClient,
33 clementine_verifier_client::ClementineVerifierClient,
34 },
35 },
36};
37use bitcoin::secp256k1::PublicKey;
38use bitcoin::XOnlyPublicKey;
39use clementine_errors::BridgeError;
40use eyre::Context;
41use futures::future::join_all;
42use std::future::Future;
43use std::hash::Hash as StdHash;
44use tokio::sync::RwLock;
45use tonic::{Request, Status};
46use tracing::{debug_span, Instrument};
47
48#[derive(Debug, Clone)]
57pub struct Aggregator {
58 pub(crate) rpc: ExtendedBitcoinRpc,
59 pub(crate) db: Database,
60 pub(crate) config: BridgeConfig,
61 #[cfg(feature = "automation")]
62 pub(crate) tx_sender: TxSenderClient<Database>,
63 operator_clients: Vec<ClementineOperatorClient<tonic::transport::Channel>>,
64 verifier_clients: Vec<ClementineVerifierClient<tonic::transport::Channel>>,
65 verifier_keys: Arc<RwLock<Vec<Option<PublicKey>>>>,
66 operator_keys: Arc<RwLock<Vec<Option<XOnlyPublicKey>>>>,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
70pub enum EntityId {
71 Verifier(VerifierId),
72 Operator(OperatorId),
73 Aggregator,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum CompatibilityCheckScope {
79 VerifiersOnly,
81 OperatorsOnly,
83 Both,
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
89pub struct VerifierId(pub PublicKey);
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
93pub struct OperatorId(pub XOnlyPublicKey);
94
95impl std::fmt::Display for EntityId {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 match self {
98 EntityId::Aggregator => write!(f, "Aggregator"),
99 EntityId::Verifier(id) => write!(f, "{id}"),
100 EntityId::Operator(id) => write!(f, "{id}"),
101 }
102 }
103}
104
105impl std::fmt::Display for VerifierId {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 write!(f, "Verifier({})", &self.0.to_string()[..10])
108 }
109}
110
111impl std::fmt::Display for OperatorId {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "Operator({})", &self.0.to_string()[..10])
114 }
115}
116
117#[derive(Debug, Clone)]
119pub struct ParticipatingVerifiers(
120 pub Vec<(
121 ClementineVerifierClient<tonic::transport::Channel>,
122 VerifierId,
123 )>,
124);
125
126impl ParticipatingVerifiers {
127 pub fn new(
128 verifiers: Vec<(
129 ClementineVerifierClient<tonic::transport::Channel>,
130 VerifierId,
131 )>,
132 ) -> Self {
133 Self(verifiers)
134 }
135
136 pub fn clients(&self) -> Vec<ClementineVerifierClient<tonic::transport::Channel>> {
137 self.0.iter().map(|(client, _)| client.clone()).collect()
138 }
139
140 pub fn ids(&self) -> Vec<VerifierId> {
141 self.0.iter().map(|(_, id)| *id).collect()
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct ParticipatingOperators(
148 pub Vec<(
149 ClementineOperatorClient<tonic::transport::Channel>,
150 OperatorId,
151 )>,
152);
153
154impl ParticipatingOperators {
155 pub fn new(
156 operators: Vec<(
157 ClementineOperatorClient<tonic::transport::Channel>,
158 OperatorId,
159 )>,
160 ) -> Self {
161 Self(operators)
162 }
163
164 pub fn clients(&self) -> Vec<ClementineOperatorClient<tonic::transport::Channel>> {
165 self.0.iter().map(|(client, _)| client.clone()).collect()
166 }
167
168 pub fn ids(&self) -> Vec<OperatorId> {
169 self.0.iter().map(|(_, id)| *id).collect()
170 }
171}
172
173impl Aggregator {
174 pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
175 let db = Database::new(&config).await?;
176
177 let rpc = ExtendedBitcoinRpc::connect(
178 config.bitcoin_rpc_url.clone(),
179 config.bitcoin_rpc_user.clone(),
180 config.bitcoin_rpc_password.clone(),
181 None,
182 )
183 .await?;
184
185 let verifier_endpoints =
186 config
187 .verifier_endpoints
188 .clone()
189 .ok_or(BridgeError::ConfigError(
190 "No verifier endpoints provided in config".into(),
191 ))?;
192
193 let operator_endpoints =
194 config
195 .operator_endpoints
196 .clone()
197 .ok_or(BridgeError::ConfigError(
198 "No operator endpoints provided in config".into(),
199 ))?;
200
201 let verifier_clients = rpc::get_clients(
203 verifier_endpoints,
204 crate::rpc::verifier_client_builder(&config),
205 &config,
206 true,
207 )
208 .await?;
209
210 let operator_clients = rpc::get_clients(
212 operator_endpoints,
213 crate::rpc::operator_client_builder(&config),
214 &config,
215 true,
216 )
217 .await?;
218
219 #[cfg(feature = "automation")]
220 let tx_sender = TxSenderClient::new(db.clone(), "aggregator".to_string());
221
222 tracing::info!(
223 "Aggregator created with {} verifiers and {} operators",
224 verifier_clients.len(),
225 operator_clients.len(),
226 );
227
228 let operator_keys = Arc::new(RwLock::new(vec![None; operator_clients.len()]));
229 let verifier_keys = Arc::new(RwLock::new(vec![None; verifier_clients.len()]));
230
231 Ok(Aggregator {
232 rpc,
233 db,
234 config,
235 #[cfg(feature = "automation")]
236 tx_sender,
237 verifier_clients,
238 operator_clients,
239 verifier_keys,
240 operator_keys,
241 })
242 }
243
244 pub fn get_verifier_clients(&self) -> &[ClementineVerifierClient<tonic::transport::Channel>] {
245 &self.verifier_clients
246 }
247
248 async fn fetch_pubkeys_from_entities<T, C, F, Fut>(
250 &self,
251 clients: &[C],
252 keys_storage: &RwLock<Vec<Option<T>>>,
253 pubkey_fetcher: F,
254 key_type_name: &str,
255 ) -> Result<Vec<T>, BridgeError>
256 where
257 T: Clone + Send + Sync + Eq + StdHash + std::fmt::Debug,
258 C: Clone + Send + Sync,
259 F: Fn(C) -> Fut + Send + Sync,
260 Fut: Future<Output = Result<T, BridgeError>> + Send,
261 {
262 let all_collected = {
264 let keys = keys_storage.read().await;
265 keys.iter().all(|key| key.is_some())
266 };
267
268 if !all_collected {
269 let mut keys = keys_storage.write().await;
271
272 if keys.len() != clients.len() {
274 return Err(eyre::eyre!(
275 "Keys storage length does not match clients length, should not happen, keys length: {}, clients length: {}",
276 keys.len(),
277 clients.len()
278 )
279 .into());
280 }
281
282 let key_collection_futures = clients
283 .iter()
284 .zip(keys.iter().enumerate())
285 .filter_map(|(client, (idx, key))| {
286 if key.is_none() {
287 Some((idx, pubkey_fetcher(client.clone())))
288 } else {
289 None
290 }
291 })
292 .map(|(idx, fut)| async move { (idx, fut.await) });
293
294 let collected_keys = join_all(key_collection_futures).await;
295 let mut missing_keys = Vec::new();
296
297 for (idx, new_key) in collected_keys {
299 match new_key {
300 Ok(new_key) => keys[idx] = Some(new_key),
301 Err(e) => {
302 tracing::debug!(
303 "Failed to collect {} {} (order in config) key: {}",
304 key_type_name,
305 idx,
306 e
307 );
308 missing_keys.push(idx);
309 }
310 }
311 }
312
313 let non_none_keys: Vec<_> = keys.iter().filter_map(|key| key.as_ref()).collect();
315 let unique_keys: HashSet<_> = non_none_keys.iter().cloned().collect();
316
317 if unique_keys.len() != non_none_keys.len() {
318 let reason = format!("{key_type_name} keys are not unique: {keys:?}");
319 for key in keys.iter_mut() {
321 *key = None;
322 }
323 return Err(eyre::eyre!(reason).into());
324 }
325
326 if keys.iter().any(|key| key.is_none()) {
328 return Err(eyre::eyre!(
329 "Not all {} keys were able to be collected, missing keys at indices: {:?}",
330 key_type_name,
331 missing_keys
332 )
333 .into());
334 }
335 }
336
337 Ok(keys_storage
339 .read()
340 .await
341 .iter()
342 .map(|key| key.clone().expect("should all be collected"))
343 .collect())
344 }
345
346 pub async fn fetch_verifier_keys(&self) -> Result<Vec<PublicKey>, BridgeError> {
349 self.fetch_pubkeys_from_entities(
350 &self.verifier_clients,
351 &self.verifier_keys,
352 |mut client| async move {
353 let mut request = Request::new(Empty {});
354 request.set_timeout(PUBLIC_KEY_COLLECTION_TIMEOUT);
355 let verifier_params = client.get_params(request).await?.into_inner();
356 let public_key = PublicKey::from_slice(&verifier_params.public_key)
357 .map_err(|e| eyre::eyre!("Failed to parse verifier public key: {}", e))?;
358 Ok::<_, BridgeError>(public_key)
359 },
360 "verifier",
361 )
362 .await
363 }
364
365 pub async fn fetch_operator_keys(&self) -> Result<Vec<XOnlyPublicKey>, BridgeError> {
368 self.fetch_pubkeys_from_entities(
369 &self.operator_clients,
370 &self.operator_keys,
371 |mut client| async move {
372 let mut request = Request::new(Empty {});
373 request.set_timeout(PUBLIC_KEY_COLLECTION_TIMEOUT);
374 let operator_xonly_pk: XOnlyPublicKey = client
375 .get_x_only_public_key(request)
376 .await?
377 .into_inner()
378 .try_into()?;
379 Ok::<_, BridgeError>(operator_xonly_pk)
380 },
381 "operator",
382 )
383 .await
384 }
385
386 pub fn get_operator_clients(&self) -> &[ClementineOperatorClient<tonic::transport::Channel>] {
387 &self.operator_clients
388 }
389
390 pub async fn collect_and_distribute_keys(
394 &self,
395 deposit_params: &DepositParams,
396 ) -> Result<(), BridgeError> {
397 tracing::info!("Starting collect_and_distribute_keys");
398
399 let start_time = std::time::Instant::now();
400
401 let deposit_data: DepositData = deposit_params.clone().try_into()?;
402
403 let (operator_keys_tx, operator_keys_rx) =
405 tokio::sync::broadcast::channel::<clementine::OperatorKeysWithDeposit>(
406 deposit_data.get_num_operators() * deposit_data.get_num_verifiers(),
407 );
408 let operator_rx_handles = (0..deposit_data.get_num_verifiers())
409 .map(|_| operator_keys_rx.resubscribe())
410 .collect::<Vec<_>>();
411
412 let operators = self.get_participating_operators(&deposit_data).await?;
413 let operator_clients = operators.clients();
414
415 let operator_xonly_pks = deposit_data.get_operators();
416 let deposit = deposit_params.clone();
417
418 tracing::info!("Starting operator key collection");
419 #[cfg(test)]
420 let timeout_params = self.config.test_params.timeout_params;
421 #[allow(clippy::unused_enumerate_index)]
422 let get_operators_keys_handle = tokio::spawn(timed_try_join_all(
423 OPERATOR_GET_KEYS_TIMEOUT,
424 "Operator key collection",
425 Some(operators.ids()),
426 operator_clients
427 .into_iter()
428 .zip(operator_xonly_pks.into_iter())
429 .enumerate()
430 .map(move |(_idx, (mut operator_client, operator_xonly_pk))| {
431 let deposit_params = deposit.clone();
432 let tx = operator_keys_tx.clone();
433 async move {
434 #[cfg(test)]
435 timeout_params
436 .hook_timeout_key_collection_operator(_idx)
437 .await;
438
439 let operator_keys = operator_client
440 .get_deposit_keys(deposit_params.clone())
441 .instrument(
442 debug_span!("get_deposit_keys", id=%OperatorId(operator_xonly_pk)),
443 )
444 .await
445 .wrap_err(Status::internal(format!(
446 "Operator {operator_xonly_pk} key retrieval failed"
447 )))?
448 .into_inner();
449
450 let _ = tx.send(OperatorKeysWithDeposit {
458 deposit_params: Some(deposit_params),
459 operator_keys: Some(operator_keys),
460 operator_xonly_pk: operator_xonly_pk.serialize().to_vec(),
461 });
462
463 Ok(())
464 }
465 }),
466 ));
467
468 tracing::info!("Starting operator key distribution to verifiers");
469 let verifiers = self.get_participating_verifiers(&deposit_data).await?;
470
471 let verifier_clients = verifiers.clients();
472 let num_operators = deposit_data.get_num_operators();
473
474 let verifier_ids = verifiers.ids();
475
476 #[cfg(test)]
477 let timeout_params = self.config.test_params.timeout_params;
478 #[allow(clippy::unused_enumerate_index)]
479 let distribute_operators_keys_handle = tokio::spawn(timed_try_join_all(
480 VERIFIER_SEND_KEYS_TIMEOUT,
481 "Verifier key distribution",
482 Some(verifier_ids.clone()),
483 verifier_clients
484 .into_iter()
485 .zip(operator_rx_handles)
486 .zip(verifier_ids)
487 .enumerate()
488 .map(
489 move |(_idx, ((mut verifier, mut rx), verifier_id))| async move {
490 #[cfg(test)]
491 timeout_params
492 .hook_timeout_key_distribution_verifier(_idx)
493 .await;
494
495 let mut received_keys = std::collections::HashSet::new();
497 while received_keys.len() < num_operators {
498 tracing::debug!(
499 "Waiting for operator key (received {}/{})",
500 received_keys.len(),
501 num_operators
502 );
503
504 let operator_keys = rx
506 .recv()
507 .instrument(debug_span!("operator_keys_recv"))
508 .await
509 .wrap_err(Status::internal(
510 "Operator broadcast channels closed before all keys were received",
511 ))?;
512
513 let operator_xonly_pk = operator_keys.operator_xonly_pk.clone();
514
515 if !received_keys.insert(operator_xonly_pk.clone()) {
516 continue;
517 }
518
519 timed_request(
520 VERIFIER_SEND_KEYS_TIMEOUT,
521 &format!("Setting operator keys for {verifier_id}"),
522 async {
523 Ok(verifier
524 .set_operator_keys(operator_keys)
525 .await
526 .wrap_err_with(|| {
527 Status::internal(format!(
528 "Failed to set operator keys for {verifier_id}",
529 ))
530 }))
531 },
532 )
533 .await??;
534 }
535 Ok::<_, BridgeError>(())
536 },
537 ),
538 ));
539
540 let (get_operators_keys_result, distribute_operators_keys_result) =
542 tokio::join!(get_operators_keys_handle, distribute_operators_keys_handle);
543
544 flatten_join_named_results([
545 ("get_operators_keys", get_operators_keys_result),
546 (
547 "distribute_operators_keys",
548 distribute_operators_keys_result,
549 ),
550 ])?;
551
552 tracing::info!(
553 "collect_and_distribute_keys completed in {:?}",
554 start_time.elapsed()
555 );
556
557 Ok(())
558 }
559
560 pub async fn get_participating_verifiers(
562 &self,
563 deposit_data: &DepositData,
564 ) -> Result<ParticipatingVerifiers, BridgeError> {
565 let verifier_keys = self.fetch_verifier_keys().await?;
566 let mut participating_verifiers = Vec::new();
567
568 let verifiers = deposit_data.get_verifiers();
569
570 for verifier_pk in verifiers {
571 if let Some(pos) = verifier_keys.iter().position(|key| key == &verifier_pk) {
572 participating_verifiers
573 .push((self.verifier_clients[pos].clone(), VerifierId(verifier_pk)));
574 } else {
575 tracing::error!(
576 "Verifier public key not found. Deposit data verifier keys: {:?}, self verifier keys: {:?}",
577 deposit_data.get_verifiers(),
578 self.verifier_keys
579 );
580 return Err(BridgeError::VerifierNotFound(verifier_pk));
581 }
582 }
583
584 Ok(ParticipatingVerifiers::new(participating_verifiers))
585 }
586
587 pub async fn get_participating_operators(
589 &self,
590 deposit_data: &DepositData,
591 ) -> Result<ParticipatingOperators, BridgeError> {
592 let operator_keys = self.fetch_operator_keys().await?;
593 let mut participating_operators = Vec::new();
594
595 let operators = deposit_data.get_operators();
596
597 for operator_pk in operators {
598 if let Some(pos) = operator_keys.iter().position(|key| key == &operator_pk) {
599 participating_operators
600 .push((self.operator_clients[pos].clone(), OperatorId(operator_pk)));
601 } else {
602 return Err(BridgeError::OperatorNotFound(operator_pk));
603 }
604 }
605
606 Ok(ParticipatingOperators::new(participating_operators))
607 }
608
609 async fn fetch_all_entity_keys(&self) -> (Vec<Option<XOnlyPublicKey>>, Vec<Option<PublicKey>>) {
612 let _ = self.fetch_operator_keys().await;
614 let _ = self.fetch_verifier_keys().await;
615
616 let operator_keys = self.operator_keys.read().await.clone();
617 let verifier_keys = self.verifier_keys.read().await.clone();
618
619 (operator_keys, verifier_keys)
620 }
621
622 fn add_unreachable_entity_errors<T, F>(
624 results: &mut Vec<T>,
625 operator_keys: &[Option<XOnlyPublicKey>],
626 verifier_keys: &[Option<PublicKey>],
627 error_constructor: F,
628 ) where
629 F: Fn(EntityType, String, String) -> T,
630 {
631 for (index, key) in operator_keys.iter().enumerate() {
632 if key.is_none() {
633 results.push(error_constructor(
634 EntityType::Operator,
635 format!("Index {index} in config (0-based)"),
636 "Operator key was not able to be collected".to_string(),
637 ));
638 }
639 }
640 for (index, key) in verifier_keys.iter().enumerate() {
641 if key.is_none() {
642 results.push(error_constructor(
643 EntityType::Verifier,
644 format!("Index {index} in config (0-based)"),
645 "Verifier key was not able to be collected".to_string(),
646 ));
647 }
648 }
649 }
650
651 pub async fn get_entity_statuses(
654 &self,
655 restart_tasks: bool,
656 ) -> Result<Vec<EntityStatusWithId>, BridgeError> {
657 tracing::debug!("Getting entities status");
658
659 let operator_clients = self.get_operator_clients();
660 let verifier_clients = self.get_verifier_clients();
661 tracing::debug!("Operator clients: {:?}", operator_clients.len());
662
663 let (operator_keys, verifier_keys) = self.fetch_all_entity_keys().await;
664
665 let operator_status = join_all(
667 operator_clients
668 .iter()
669 .zip(operator_keys.iter())
670 .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
671 .map(|(client, key)| {
672 let mut client = client.clone();
673 let key = *key;
674 async move {
675 tracing::debug!("Getting operator status for {:?}", key);
676 let mut request = Request::new(Empty {});
677 request.set_timeout(ENTITY_STATUS_POLL_TIMEOUT);
678 let response = client.get_current_status(request).await;
679
680 EntityStatusWithId {
681 entity_id: Some(RPCEntityId {
682 kind: EntityType::Operator as i32,
683 id: key.to_string(),
684 }),
685 status_result: match response {
686 Ok(response) => Some(StatusResult::Status(response.into_inner())),
687 Err(e) => Some(StatusResult::Err(clementine::EntityError {
688 error: e.to_string(),
689 })),
690 },
691 }
692 }
693 }),
694 )
695 .await;
696
697 let verifier_status = join_all(
699 verifier_clients
700 .iter()
701 .zip(verifier_keys.iter())
702 .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
703 .map(|(client, key)| {
704 let mut client = client.clone();
705 let key = *key;
706 async move {
707 tracing::debug!("Getting verifier status for {:?}", key);
708 let mut request = Request::new(Empty {});
709 request.set_timeout(ENTITY_STATUS_POLL_TIMEOUT);
710 let response = client.get_current_status(request).await;
711
712 EntityStatusWithId {
713 entity_id: Some(RPCEntityId {
714 kind: EntityType::Verifier as i32,
715 id: key.to_string(),
716 }),
717 status_result: match response {
718 Ok(response) => Some(StatusResult::Status(response.into_inner())),
719 Err(e) => Some(StatusResult::Err(clementine::EntityError {
720 error: e.to_string(),
721 })),
722 },
723 }
724 }
725 }),
726 )
727 .await;
728
729 let mut entity_statuses = operator_status;
731 entity_statuses.extend(verifier_status);
732
733 if restart_tasks {
735 let operator_tasks = operator_clients
736 .iter()
737 .zip(operator_keys.iter())
738 .filter_map(|(client, key)| key.map(|key| (client, key)))
739 .map(|(client, key)| {
740 let mut client = client.clone();
741 async move {
742 let mut request = Request::new(Empty {});
743 request.set_timeout(RESTART_BACKGROUND_TASKS_TIMEOUT);
744 client
745 .restart_background_tasks(Request::new(Empty {}))
746 .await
747 .wrap_err_with(|| {
748 Status::internal(format!(
749 "Failed to restart background tasks for operator {key}"
750 ))
751 })
752 }
753 });
754
755 let verifier_tasks = verifier_clients
756 .iter()
757 .zip(verifier_keys.iter())
758 .filter_map(|(client, key)| key.map(|key| (client, key)))
759 .map(|(client, key)| {
760 let mut client = client.clone();
761 async move {
762 let mut request = Request::new(Empty {});
763 request.set_timeout(RESTART_BACKGROUND_TASKS_TIMEOUT);
764 client
765 .restart_background_tasks(Request::new(Empty {}))
766 .await
767 .wrap_err_with(|| {
768 Status::internal(format!(
769 "Failed to restart background tasks for verifier {key}"
770 ))
771 })
772 }
773 });
774
775 let (operator_results, verifier_results) = futures::join!(
776 futures::future::join_all(operator_tasks),
777 futures::future::join_all(verifier_tasks)
778 );
779 for result in operator_results
781 .into_iter()
782 .chain(verifier_results.into_iter())
783 {
784 if let Err(e) = result {
785 tracing::error!("{e:?}");
786 }
787 }
788 }
789
790 Self::add_unreachable_entity_errors(
792 &mut entity_statuses,
793 &operator_keys,
794 &verifier_keys,
795 |entity_type, id, error_msg| EntityStatusWithId {
796 entity_id: Some(RPCEntityId {
797 kind: entity_type as i32,
798 id,
799 }),
800 status_result: Some(StatusResult::Err(clementine::EntityError {
801 error: error_msg,
802 })),
803 },
804 );
805
806 Ok(entity_statuses)
807 }
808
809 pub async fn get_compatibility_data_from_entities(
810 &self,
811 ) -> Result<Vec<EntityDataWithId>, BridgeError> {
812 let operator_clients = self.get_operator_clients();
813 let verifier_clients = self.get_verifier_clients();
814
815 let (operator_keys, verifier_keys) = self.fetch_all_entity_keys().await;
816
817 let operator_comp_data = join_all(
819 operator_clients
820 .iter()
821 .zip(operator_keys.iter())
822 .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
823 .map(|(client, key)| {
824 let mut client = client.clone();
825 let key = *key;
826 async move {
827 tracing::debug!("Getting operator compatibility data for {:?}", key);
828 let mut request = Request::new(Empty {});
829 request.set_timeout(ENTITY_COMP_DATA_POLL_TIMEOUT);
830 let response = client.get_compatibility_params(request).await;
831
832 EntityDataWithId {
833 entity_id: Some(RPCEntityId {
834 kind: EntityType::Operator as i32,
835 id: key.to_string(),
836 }),
837 data_result: match response {
838 Ok(response) => Some(DataResult::Data(response.into_inner())),
839 Err(e) => Some(DataResult::Error(e.to_string())),
840 },
841 }
842 }
843 }),
844 )
845 .await;
846
847 let verifier_comp_data = join_all(
849 verifier_clients
850 .iter()
851 .zip(verifier_keys.iter())
852 .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
853 .map(|(client, key)| {
854 let mut client = client.clone();
855 let key = *key;
856 async move {
857 tracing::debug!("Getting verifier compatibility data for {:?}", key);
858 let mut request = Request::new(Empty {});
859 request.set_timeout(ENTITY_COMP_DATA_POLL_TIMEOUT);
860 let response = client.get_compatibility_params(request).await;
861
862 EntityDataWithId {
863 entity_id: Some(RPCEntityId {
864 kind: EntityType::Verifier as i32,
865 id: key.to_string(),
866 }),
867 data_result: match response {
868 Ok(response) => Some(DataResult::Data(response.into_inner())),
869 Err(e) => Some(DataResult::Error(e.to_string())),
870 },
871 }
872 }
873 }),
874 )
875 .await;
876
877 let mut entities_comp_data = operator_comp_data;
879 entities_comp_data.extend(verifier_comp_data);
880
881 let aggregator_comp_data = EntityDataWithId {
883 entity_id: Some(RPCEntityId {
884 kind: EntityType::Aggregator as i32,
885 id: "Aggregator".to_string(),
886 }),
887 data_result: {
888 let compatibility_params: Result<CompatibilityParamsRpc, eyre::Report> =
889 self.get_compatibility_params()?.try_into();
890 match compatibility_params {
891 Ok(compatibility_params) => Some(DataResult::Data(compatibility_params)),
892 Err(e) => Some(DataResult::Error(e.to_string())),
893 }
894 },
895 };
896
897 entities_comp_data.push(aggregator_comp_data);
898
899 Self::add_unreachable_entity_errors(
901 &mut entities_comp_data,
902 &operator_keys,
903 &verifier_keys,
904 |entity_type, id, error_msg| EntityDataWithId {
905 entity_id: Some(RPCEntityId {
906 kind: entity_type as i32,
907 id,
908 }),
909 data_result: Some(DataResult::Error(error_msg)),
910 },
911 );
912
913 Ok(entities_comp_data)
914 }
915
916 pub async fn check_compatibility_with_actors(
919 &self,
920 scope: CompatibilityCheckScope,
921 ) -> Result<(), BridgeError> {
922 let mut errors = Vec::new();
923 let mut operator_futures = Vec::new();
924 let mut verifier_futures = Vec::new();
925
926 let operators_included = matches!(
927 scope,
928 CompatibilityCheckScope::OperatorsOnly | CompatibilityCheckScope::Both
929 );
930 let verifiers_included = matches!(
931 scope,
932 CompatibilityCheckScope::VerifiersOnly | CompatibilityCheckScope::Both
933 );
934
935 if operators_included {
936 let operator_keys = self.fetch_operator_keys().await?;
937 for (operator_id, operator_client) in operator_keys
938 .into_iter()
939 .map(OperatorId)
940 .zip(self.operator_clients.iter())
941 {
942 let mut operator_client = operator_client.clone();
943 let operator_id_str = operator_id.to_string();
944
945 operator_futures.push(async move {
946 let compatibility_params: CompatibilityParams = operator_client
947 .get_compatibility_params(Empty {})
948 .await
949 .wrap_err(format!(
950 "{operator_id_str} compatibility params retrieval failed"
951 ))?
952 .into_inner()
953 .try_into()
954 .wrap_err(format!(
955 "{operator_id_str} compatibility params conversion failed"
956 ))?;
957 Ok::<_, BridgeError>((operator_id_str, compatibility_params))
958 });
959 }
960 }
961
962 if verifiers_included {
963 let verifier_keys = self.fetch_verifier_keys().await?;
964 for (verifier_id, verifier_client) in verifier_keys
965 .into_iter()
966 .map(VerifierId)
967 .zip(self.verifier_clients.iter())
968 {
969 let mut verifier_client = verifier_client.clone();
970 let verifier_id_str = verifier_id.to_string();
971
972 verifier_futures.push(async move {
973 let compatibility_params: CompatibilityParams = verifier_client
974 .get_compatibility_params(Empty {})
975 .await
976 .wrap_err(format!(
977 "{verifier_id_str} compatibility params retrieval failed"
978 ))?
979 .into_inner()
980 .try_into()
981 .wrap_err(format!(
982 "{verifier_id_str} compatibility params conversion failed"
983 ))?;
984 Ok::<_, BridgeError>((verifier_id_str, compatibility_params))
985 });
986 }
987 }
988
989 let (operator_results, operator_err) = join_all_partition_results(operator_futures).await;
991 let (verifier_results, verifier_err) = join_all_partition_results(verifier_futures).await;
992
993 let mut actors_compat_params = Vec::new();
994 actors_compat_params.extend(operator_results);
995 actors_compat_params.extend(verifier_results);
996
997 if let Some(operator_err) = operator_err {
998 errors.push(format!(
999 "Error while retrieving operator compatibility params: {operator_err}"
1000 ));
1001 }
1002 if let Some(verifier_err) = verifier_err {
1003 errors.push(format!(
1004 "Error while retrieving verifier compatibility params: {verifier_err}"
1005 ));
1006 }
1007
1008 if let Err(e) = self.is_compatible(actors_compat_params) {
1010 errors.push(format!("Clementine not compatible with some actors: {e}"));
1011 }
1012 if !errors.is_empty() {
1013 return Err(eyre::eyre!(errors.join("; ")).into());
1014 }
1015
1016 Ok(())
1017 }
1018}
1019
1020#[derive(Debug)]
1022pub struct AggregatorServer {
1023 pub aggregator: Aggregator,
1024 background_tasks: crate::task::manager::BackgroundTaskManager,
1025}
1026
1027impl AggregatorServer {
1028 pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
1029 let aggregator = Aggregator::new(config.clone()).await?;
1030 let background_tasks = crate::task::manager::BackgroundTaskManager::default();
1031
1032 Ok(Self {
1033 aggregator,
1034 background_tasks,
1035 })
1036 }
1037
1038 pub async fn start_background_tasks(&self) -> Result<(), BridgeError> {
1041 self.background_tasks
1043 .ensure_task_looping(
1044 crate::task::aggregator_metric_publisher::AggregatorMetricPublisher::new(
1045 self.aggregator.clone(),
1046 )
1047 .await?
1048 .with_delay(AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY),
1049 )
1050 .await;
1051
1052 tracing::info!("Aggregator metric publisher task started");
1053
1054 Ok(())
1055 }
1056}
1057
1058impl Deref for AggregatorServer {
1059 type Target = Aggregator;
1060
1061 fn deref(&self) -> &Self::Target {
1062 &self.aggregator
1063 }
1064}