1use std::collections::HashSet;
2use std::ops::Deref;
3use std::sync::Arc;
4
5use crate::compatibility::{ActorWithConfig, CompatibilityParams};
6use crate::constants::{
7 ENTITY_COMP_DATA_POLL_TIMEOUT, ENTITY_STATUS_POLL_TIMEOUT, OPERATOR_GET_KEYS_TIMEOUT,
8 PUBLIC_KEY_COLLECTION_TIMEOUT, RESTART_BACKGROUND_TASKS_TIMEOUT, VERIFIER_SEND_KEYS_TIMEOUT,
9};
10use crate::deposit::DepositData;
11use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
12use crate::rpc::clementine::entity_data_with_id::DataResult;
13use crate::rpc::clementine::entity_status_with_id::StatusResult;
14use crate::rpc::clementine::{
15 self, CompatibilityParamsRpc, DepositParams, Empty, EntityStatusWithId, EntityType,
16 OperatorKeysWithDeposit,
17};
18use crate::rpc::clementine::{EntityDataWithId, EntityId as RPCEntityId};
19use crate::task::aggregator_metric_publisher::AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY;
20use crate::task::TaskExt;
21#[cfg(feature = "automation")]
22use crate::tx_sender::TxSenderClient;
23use crate::utils::{flatten_join_named_results, timed_request, timed_try_join_all};
24use crate::{
25 config::BridgeConfig,
26 database::Database,
27 errors::BridgeError,
28 rpc::{
29 self,
30 clementine::{
31 clementine_operator_client::ClementineOperatorClient,
32 clementine_verifier_client::ClementineVerifierClient,
33 },
34 },
35};
36use bitcoin::secp256k1::PublicKey;
37use bitcoin::XOnlyPublicKey;
38use eyre::Context;
39use futures::future::join_all;
40use std::future::Future;
41use std::hash::Hash as StdHash;
42use tokio::sync::RwLock;
43use tonic::{Request, Status};
44use tracing::{debug_span, Instrument};
45
46#[derive(Debug, Clone)]
55pub struct Aggregator {
56 pub(crate) rpc: ExtendedBitcoinRpc,
57 pub(crate) db: Database,
58 pub(crate) config: BridgeConfig,
59 #[cfg(feature = "automation")]
60 pub(crate) tx_sender: TxSenderClient,
61 operator_clients: Vec<ClementineOperatorClient<tonic::transport::Channel>>,
62 verifier_clients: Vec<ClementineVerifierClient<tonic::transport::Channel>>,
63 verifier_keys: Arc<RwLock<Vec<Option<PublicKey>>>>,
64 operator_keys: Arc<RwLock<Vec<Option<XOnlyPublicKey>>>>,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub enum EntityId {
69 Verifier(VerifierId),
70 Operator(OperatorId),
71 Aggregator,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
76pub struct VerifierId(pub PublicKey);
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
80pub struct OperatorId(pub XOnlyPublicKey);
81
82impl std::fmt::Display for EntityId {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 match self {
85 EntityId::Aggregator => write!(f, "Aggregator"),
86 EntityId::Verifier(id) => write!(f, "{id}"),
87 EntityId::Operator(id) => write!(f, "{id}"),
88 }
89 }
90}
91
92impl std::fmt::Display for VerifierId {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 write!(f, "Verifier({})", &self.0.to_string()[..10])
95 }
96}
97
98impl std::fmt::Display for OperatorId {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "Operator({})", &self.0.to_string()[..10])
101 }
102}
103
104#[derive(Debug, Clone)]
106pub struct ParticipatingVerifiers(
107 pub Vec<(
108 ClementineVerifierClient<tonic::transport::Channel>,
109 VerifierId,
110 )>,
111);
112
113impl ParticipatingVerifiers {
114 pub fn new(
115 verifiers: Vec<(
116 ClementineVerifierClient<tonic::transport::Channel>,
117 VerifierId,
118 )>,
119 ) -> Self {
120 Self(verifiers)
121 }
122
123 pub fn clients(&self) -> Vec<ClementineVerifierClient<tonic::transport::Channel>> {
124 self.0.iter().map(|(client, _)| client.clone()).collect()
125 }
126
127 pub fn ids(&self) -> Vec<VerifierId> {
128 self.0.iter().map(|(_, id)| *id).collect()
129 }
130}
131
132#[derive(Debug, Clone)]
134pub struct ParticipatingOperators(
135 pub Vec<(
136 ClementineOperatorClient<tonic::transport::Channel>,
137 OperatorId,
138 )>,
139);
140
141impl ParticipatingOperators {
142 pub fn new(
143 operators: Vec<(
144 ClementineOperatorClient<tonic::transport::Channel>,
145 OperatorId,
146 )>,
147 ) -> Self {
148 Self(operators)
149 }
150
151 pub fn clients(&self) -> Vec<ClementineOperatorClient<tonic::transport::Channel>> {
152 self.0.iter().map(|(client, _)| client.clone()).collect()
153 }
154
155 pub fn ids(&self) -> Vec<OperatorId> {
156 self.0.iter().map(|(_, id)| *id).collect()
157 }
158}
159
160impl Aggregator {
161 pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
162 let db = Database::new(&config).await?;
163
164 let rpc = ExtendedBitcoinRpc::connect(
165 config.bitcoin_rpc_url.clone(),
166 config.bitcoin_rpc_user.clone(),
167 config.bitcoin_rpc_password.clone(),
168 None,
169 )
170 .await?;
171
172 let verifier_endpoints =
173 config
174 .verifier_endpoints
175 .clone()
176 .ok_or(BridgeError::ConfigError(
177 "No verifier endpoints provided in config".into(),
178 ))?;
179
180 let operator_endpoints =
181 config
182 .operator_endpoints
183 .clone()
184 .ok_or(BridgeError::ConfigError(
185 "No operator endpoints provided in config".into(),
186 ))?;
187
188 let verifier_clients = rpc::get_clients(
190 verifier_endpoints,
191 crate::rpc::verifier_client_builder(&config),
192 &config,
193 true,
194 )
195 .await?;
196
197 let operator_clients = rpc::get_clients(
199 operator_endpoints,
200 crate::rpc::operator_client_builder(&config),
201 &config,
202 true,
203 )
204 .await?;
205
206 #[cfg(feature = "automation")]
207 let tx_sender = TxSenderClient::new(db.clone(), "aggregator".to_string());
208
209 tracing::info!(
210 "Aggregator created with {} verifiers and {} operators",
211 verifier_clients.len(),
212 operator_clients.len(),
213 );
214
215 let operator_keys = Arc::new(RwLock::new(vec![None; operator_clients.len()]));
216 let verifier_keys = Arc::new(RwLock::new(vec![None; verifier_clients.len()]));
217
218 Ok(Aggregator {
219 rpc,
220 db,
221 config,
222 #[cfg(feature = "automation")]
223 tx_sender,
224 verifier_clients,
225 operator_clients,
226 verifier_keys,
227 operator_keys,
228 })
229 }
230
231 pub fn get_verifier_clients(&self) -> &[ClementineVerifierClient<tonic::transport::Channel>] {
232 &self.verifier_clients
233 }
234
235 async fn fetch_pubkeys_from_entities<T, C, F, Fut>(
237 &self,
238 clients: &[C],
239 keys_storage: &RwLock<Vec<Option<T>>>,
240 pubkey_fetcher: F,
241 key_type_name: &str,
242 ) -> Result<Vec<T>, BridgeError>
243 where
244 T: Clone + Send + Sync + Eq + StdHash + std::fmt::Debug,
245 C: Clone + Send + Sync,
246 F: Fn(C) -> Fut + Send + Sync,
247 Fut: Future<Output = Result<T, BridgeError>> + Send,
248 {
249 let all_collected = {
251 let keys = keys_storage.read().await;
252 keys.iter().all(|key| key.is_some())
253 };
254
255 if !all_collected {
256 let mut keys = keys_storage.write().await;
258
259 if keys.len() != clients.len() {
261 return Err(eyre::eyre!(
262 "Keys storage length does not match clients length, should not happen, keys length: {}, clients length: {}",
263 keys.len(),
264 clients.len()
265 )
266 .into());
267 }
268
269 let key_collection_futures = clients
270 .iter()
271 .zip(keys.iter().enumerate())
272 .filter_map(|(client, (idx, key))| {
273 if key.is_none() {
274 Some((idx, pubkey_fetcher(client.clone())))
275 } else {
276 None
277 }
278 })
279 .map(|(idx, fut)| async move { (idx, fut.await) });
280
281 let collected_keys = join_all(key_collection_futures).await;
282 let mut missing_keys = Vec::new();
283
284 for (idx, new_key) in collected_keys {
286 match new_key {
287 Ok(new_key) => keys[idx] = Some(new_key),
288 Err(e) => {
289 tracing::debug!(
290 "Failed to collect {} {} (order in config) key: {}",
291 key_type_name,
292 idx,
293 e
294 );
295 missing_keys.push(idx);
296 }
297 }
298 }
299
300 let non_none_keys: Vec<_> = keys.iter().filter_map(|key| key.as_ref()).collect();
302 let unique_keys: HashSet<_> = non_none_keys.iter().cloned().collect();
303
304 if unique_keys.len() != non_none_keys.len() {
305 let reason = format!("{key_type_name} keys are not unique: {keys:?}");
306 for key in keys.iter_mut() {
308 *key = None;
309 }
310 return Err(eyre::eyre!(reason).into());
311 }
312
313 if keys.iter().any(|key| key.is_none()) {
315 return Err(eyre::eyre!(
316 "Not all {} keys were able to be collected, missing keys at indices: {:?}",
317 key_type_name,
318 missing_keys
319 )
320 .into());
321 }
322 }
323
324 Ok(keys_storage
326 .read()
327 .await
328 .iter()
329 .map(|key| key.clone().expect("should all be collected"))
330 .collect())
331 }
332
333 pub async fn fetch_verifier_keys(&self) -> Result<Vec<PublicKey>, BridgeError> {
336 self.fetch_pubkeys_from_entities(
337 &self.verifier_clients,
338 &self.verifier_keys,
339 |mut client| async move {
340 let mut request = Request::new(Empty {});
341 request.set_timeout(PUBLIC_KEY_COLLECTION_TIMEOUT);
342 let verifier_params = client.get_params(request).await?.into_inner();
343 let public_key = PublicKey::from_slice(&verifier_params.public_key)
344 .map_err(|e| eyre::eyre!("Failed to parse verifier public key: {}", e))?;
345 Ok::<_, BridgeError>(public_key)
346 },
347 "verifier",
348 )
349 .await
350 }
351
352 pub async fn fetch_operator_keys(&self) -> Result<Vec<XOnlyPublicKey>, BridgeError> {
355 self.fetch_pubkeys_from_entities(
356 &self.operator_clients,
357 &self.operator_keys,
358 |mut client| async move {
359 let mut request = Request::new(Empty {});
360 request.set_timeout(PUBLIC_KEY_COLLECTION_TIMEOUT);
361 let operator_xonly_pk: XOnlyPublicKey = client
362 .get_x_only_public_key(request)
363 .await?
364 .into_inner()
365 .try_into()?;
366 Ok::<_, BridgeError>(operator_xonly_pk)
367 },
368 "operator",
369 )
370 .await
371 }
372
373 pub fn get_operator_clients(&self) -> &[ClementineOperatorClient<tonic::transport::Channel>] {
374 &self.operator_clients
375 }
376
377 pub async fn collect_and_distribute_keys(
381 &self,
382 deposit_params: &DepositParams,
383 ) -> Result<(), BridgeError> {
384 tracing::info!("Starting collect_and_distribute_keys");
385
386 let start_time = std::time::Instant::now();
387
388 let deposit_data: DepositData = deposit_params.clone().try_into()?;
389
390 let (operator_keys_tx, operator_keys_rx) =
392 tokio::sync::broadcast::channel::<clementine::OperatorKeysWithDeposit>(
393 deposit_data.get_num_operators() * deposit_data.get_num_verifiers(),
394 );
395 let operator_rx_handles = (0..deposit_data.get_num_verifiers())
396 .map(|_| operator_keys_rx.resubscribe())
397 .collect::<Vec<_>>();
398
399 let operators = self.get_participating_operators(&deposit_data).await?;
400 let operator_clients = operators.clients();
401
402 let operator_xonly_pks = deposit_data.get_operators();
403 let deposit = deposit_params.clone();
404
405 tracing::info!("Starting operator key collection");
406 #[cfg(test)]
407 let timeout_params = self.config.test_params.timeout_params;
408 #[allow(clippy::unused_enumerate_index)]
409 let get_operators_keys_handle = tokio::spawn(timed_try_join_all(
410 OPERATOR_GET_KEYS_TIMEOUT,
411 "Operator key collection",
412 Some(operators.ids()),
413 operator_clients
414 .into_iter()
415 .zip(operator_xonly_pks.into_iter())
416 .enumerate()
417 .map(move |(_idx, (mut operator_client, operator_xonly_pk))| {
418 let deposit_params = deposit.clone();
419 let tx = operator_keys_tx.clone();
420 async move {
421 #[cfg(test)]
422 timeout_params
423 .hook_timeout_key_collection_operator(_idx)
424 .await;
425
426 let operator_keys = operator_client
427 .get_deposit_keys(deposit_params.clone())
428 .instrument(
429 debug_span!("get_deposit_keys", id=%OperatorId(operator_xonly_pk)),
430 )
431 .await
432 .wrap_err(Status::internal(format!(
433 "Operator {operator_xonly_pk} key retrieval failed"
434 )))?
435 .into_inner();
436
437 let _ = tx.send(OperatorKeysWithDeposit {
445 deposit_params: Some(deposit_params),
446 operator_keys: Some(operator_keys),
447 operator_xonly_pk: operator_xonly_pk.serialize().to_vec(),
448 });
449
450 Ok(())
451 }
452 }),
453 ));
454
455 tracing::info!("Starting operator key distribution to verifiers");
456 let verifiers = self.get_participating_verifiers(&deposit_data).await?;
457
458 let verifier_clients = verifiers.clients();
459 let num_operators = deposit_data.get_num_operators();
460
461 let verifier_ids = verifiers.ids();
462
463 #[cfg(test)]
464 let timeout_params = self.config.test_params.timeout_params;
465 #[allow(clippy::unused_enumerate_index)]
466 let distribute_operators_keys_handle = tokio::spawn(timed_try_join_all(
467 VERIFIER_SEND_KEYS_TIMEOUT,
468 "Verifier key distribution",
469 Some(verifier_ids.clone()),
470 verifier_clients
471 .into_iter()
472 .zip(operator_rx_handles)
473 .zip(verifier_ids)
474 .enumerate()
475 .map(
476 move |(_idx, ((mut verifier, mut rx), verifier_id))| async move {
477 #[cfg(test)]
478 timeout_params
479 .hook_timeout_key_distribution_verifier(_idx)
480 .await;
481
482 let mut received_keys = std::collections::HashSet::new();
484 while received_keys.len() < num_operators {
485 tracing::debug!(
486 "Waiting for operator key (received {}/{})",
487 received_keys.len(),
488 num_operators
489 );
490
491 let operator_keys = rx
493 .recv()
494 .instrument(debug_span!("operator_keys_recv"))
495 .await
496 .wrap_err(Status::internal(
497 "Operator broadcast channels closed before all keys were received",
498 ))?;
499
500 let operator_xonly_pk = operator_keys.operator_xonly_pk.clone();
501
502 if !received_keys.insert(operator_xonly_pk.clone()) {
503 continue;
504 }
505
506 timed_request(
507 VERIFIER_SEND_KEYS_TIMEOUT,
508 &format!("Setting operator keys for {verifier_id}"),
509 async {
510 Ok(verifier
511 .set_operator_keys(operator_keys)
512 .await
513 .wrap_err_with(|| {
514 Status::internal(format!(
515 "Failed to set operator keys for {verifier_id}",
516 ))
517 }))
518 },
519 )
520 .await??;
521 }
522 Ok::<_, BridgeError>(())
523 },
524 ),
525 ));
526
527 let (get_operators_keys_result, distribute_operators_keys_result) =
529 tokio::join!(get_operators_keys_handle, distribute_operators_keys_handle);
530
531 flatten_join_named_results([
532 ("get_operators_keys", get_operators_keys_result),
533 (
534 "distribute_operators_keys",
535 distribute_operators_keys_result,
536 ),
537 ])?;
538
539 tracing::info!(
540 "collect_and_distribute_keys completed in {:?}",
541 start_time.elapsed()
542 );
543
544 Ok(())
545 }
546
547 pub async fn get_participating_verifiers(
549 &self,
550 deposit_data: &DepositData,
551 ) -> Result<ParticipatingVerifiers, BridgeError> {
552 let verifier_keys = self.fetch_verifier_keys().await?;
553 let mut participating_verifiers = Vec::new();
554
555 let verifiers = deposit_data.get_verifiers();
556
557 for verifier_pk in verifiers {
558 if let Some(pos) = verifier_keys.iter().position(|key| key == &verifier_pk) {
559 participating_verifiers
560 .push((self.verifier_clients[pos].clone(), VerifierId(verifier_pk)));
561 } else {
562 tracing::error!(
563 "Verifier public key not found. Deposit data verifier keys: {:?}, self verifier keys: {:?}",
564 deposit_data.get_verifiers(),
565 self.verifier_keys
566 );
567 return Err(BridgeError::VerifierNotFound(verifier_pk));
568 }
569 }
570
571 Ok(ParticipatingVerifiers::new(participating_verifiers))
572 }
573
574 pub async fn get_participating_operators(
576 &self,
577 deposit_data: &DepositData,
578 ) -> Result<ParticipatingOperators, BridgeError> {
579 let operator_keys = self.fetch_operator_keys().await?;
580 let mut participating_operators = Vec::new();
581
582 let operators = deposit_data.get_operators();
583
584 for operator_pk in operators {
585 if let Some(pos) = operator_keys.iter().position(|key| key == &operator_pk) {
586 participating_operators
587 .push((self.operator_clients[pos].clone(), OperatorId(operator_pk)));
588 } else {
589 return Err(BridgeError::OperatorNotFound(operator_pk));
590 }
591 }
592
593 Ok(ParticipatingOperators::new(participating_operators))
594 }
595
596 async fn fetch_all_entity_keys(&self) -> (Vec<Option<XOnlyPublicKey>>, Vec<Option<PublicKey>>) {
599 let _ = self.fetch_operator_keys().await;
601 let _ = self.fetch_verifier_keys().await;
602
603 let operator_keys = self.operator_keys.read().await.clone();
604 let verifier_keys = self.verifier_keys.read().await.clone();
605
606 (operator_keys, verifier_keys)
607 }
608
609 fn add_unreachable_entity_errors<T, F>(
611 results: &mut Vec<T>,
612 operator_keys: &[Option<XOnlyPublicKey>],
613 verifier_keys: &[Option<PublicKey>],
614 error_constructor: F,
615 ) where
616 F: Fn(EntityType, String, String) -> T,
617 {
618 for (index, key) in operator_keys.iter().enumerate() {
619 if key.is_none() {
620 results.push(error_constructor(
621 EntityType::Operator,
622 format!("Index {index} in config (0-based)"),
623 "Operator key was not able to be collected".to_string(),
624 ));
625 }
626 }
627 for (index, key) in verifier_keys.iter().enumerate() {
628 if key.is_none() {
629 results.push(error_constructor(
630 EntityType::Verifier,
631 format!("Index {index} in config (0-based)"),
632 "Verifier key was not able to be collected".to_string(),
633 ));
634 }
635 }
636 }
637
638 pub async fn get_entity_statuses(
641 &self,
642 restart_tasks: bool,
643 ) -> Result<Vec<EntityStatusWithId>, BridgeError> {
644 tracing::debug!("Getting entities status");
645
646 let operator_clients = self.get_operator_clients();
647 let verifier_clients = self.get_verifier_clients();
648 tracing::debug!("Operator clients: {:?}", operator_clients.len());
649
650 let (operator_keys, verifier_keys) = self.fetch_all_entity_keys().await;
651
652 let operator_status = join_all(
654 operator_clients
655 .iter()
656 .zip(operator_keys.iter())
657 .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
658 .map(|(client, key)| {
659 let mut client = client.clone();
660 let key = *key;
661 async move {
662 tracing::debug!("Getting operator status for {:?}", key);
663 let mut request = Request::new(Empty {});
664 request.set_timeout(ENTITY_STATUS_POLL_TIMEOUT);
665 let response = client.get_current_status(request).await;
666
667 EntityStatusWithId {
668 entity_id: Some(RPCEntityId {
669 kind: EntityType::Operator as i32,
670 id: key.to_string(),
671 }),
672 status_result: match response {
673 Ok(response) => Some(StatusResult::Status(response.into_inner())),
674 Err(e) => Some(StatusResult::Err(clementine::EntityError {
675 error: e.to_string(),
676 })),
677 },
678 }
679 }
680 }),
681 )
682 .await;
683
684 let verifier_status = join_all(
686 verifier_clients
687 .iter()
688 .zip(verifier_keys.iter())
689 .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
690 .map(|(client, key)| {
691 let mut client = client.clone();
692 let key = *key;
693 async move {
694 tracing::debug!("Getting verifier status for {:?}", key);
695 let mut request = Request::new(Empty {});
696 request.set_timeout(ENTITY_STATUS_POLL_TIMEOUT);
697 let response = client.get_current_status(request).await;
698
699 EntityStatusWithId {
700 entity_id: Some(RPCEntityId {
701 kind: EntityType::Verifier as i32,
702 id: key.to_string(),
703 }),
704 status_result: match response {
705 Ok(response) => Some(StatusResult::Status(response.into_inner())),
706 Err(e) => Some(StatusResult::Err(clementine::EntityError {
707 error: e.to_string(),
708 })),
709 },
710 }
711 }
712 }),
713 )
714 .await;
715
716 let mut entity_statuses = operator_status;
718 entity_statuses.extend(verifier_status);
719
720 if restart_tasks {
722 let operator_tasks = operator_clients
723 .iter()
724 .zip(operator_keys.iter())
725 .filter_map(|(client, key)| key.map(|key| (client, key)))
726 .map(|(client, key)| {
727 let mut client = client.clone();
728 async move {
729 let mut request = Request::new(Empty {});
730 request.set_timeout(RESTART_BACKGROUND_TASKS_TIMEOUT);
731 client
732 .restart_background_tasks(Request::new(Empty {}))
733 .await
734 .wrap_err_with(|| {
735 Status::internal(format!(
736 "Failed to restart background tasks for operator {key}"
737 ))
738 })
739 }
740 });
741
742 let verifier_tasks = verifier_clients
743 .iter()
744 .zip(verifier_keys.iter())
745 .filter_map(|(client, key)| key.map(|key| (client, key)))
746 .map(|(client, key)| {
747 let mut client = client.clone();
748 async move {
749 let mut request = Request::new(Empty {});
750 request.set_timeout(RESTART_BACKGROUND_TASKS_TIMEOUT);
751 client
752 .restart_background_tasks(Request::new(Empty {}))
753 .await
754 .wrap_err_with(|| {
755 Status::internal(format!(
756 "Failed to restart background tasks for verifier {key}"
757 ))
758 })
759 }
760 });
761
762 let (operator_results, verifier_results) = futures::join!(
763 futures::future::join_all(operator_tasks),
764 futures::future::join_all(verifier_tasks)
765 );
766 for result in operator_results
768 .into_iter()
769 .chain(verifier_results.into_iter())
770 {
771 if let Err(e) = result {
772 tracing::error!("{e:?}");
773 }
774 }
775 }
776
777 Self::add_unreachable_entity_errors(
779 &mut entity_statuses,
780 &operator_keys,
781 &verifier_keys,
782 |entity_type, id, error_msg| EntityStatusWithId {
783 entity_id: Some(RPCEntityId {
784 kind: entity_type as i32,
785 id,
786 }),
787 status_result: Some(StatusResult::Err(clementine::EntityError {
788 error: error_msg,
789 })),
790 },
791 );
792
793 Ok(entity_statuses)
794 }
795
796 pub async fn get_compatibility_data_from_entities(
797 &self,
798 ) -> Result<Vec<EntityDataWithId>, BridgeError> {
799 let operator_clients = self.get_operator_clients();
800 let verifier_clients = self.get_verifier_clients();
801
802 let (operator_keys, verifier_keys) = self.fetch_all_entity_keys().await;
803
804 let operator_comp_data = join_all(
806 operator_clients
807 .iter()
808 .zip(operator_keys.iter())
809 .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
810 .map(|(client, key)| {
811 let mut client = client.clone();
812 let key = *key;
813 async move {
814 tracing::debug!("Getting operator compatibility data for {:?}", key);
815 let mut request = Request::new(Empty {});
816 request.set_timeout(ENTITY_COMP_DATA_POLL_TIMEOUT);
817 let response = client.get_compatibility_params(request).await;
818
819 EntityDataWithId {
820 entity_id: Some(RPCEntityId {
821 kind: EntityType::Operator as i32,
822 id: key.to_string(),
823 }),
824 data_result: match response {
825 Ok(response) => Some(DataResult::Data(response.into_inner())),
826 Err(e) => Some(DataResult::Error(e.to_string())),
827 },
828 }
829 }
830 }),
831 )
832 .await;
833
834 let verifier_comp_data = join_all(
836 verifier_clients
837 .iter()
838 .zip(verifier_keys.iter())
839 .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
840 .map(|(client, key)| {
841 let mut client = client.clone();
842 let key = *key;
843 async move {
844 tracing::debug!("Getting verifier compatibility data for {:?}", key);
845 let mut request = Request::new(Empty {});
846 request.set_timeout(ENTITY_COMP_DATA_POLL_TIMEOUT);
847 let response = client.get_compatibility_params(request).await;
848
849 EntityDataWithId {
850 entity_id: Some(RPCEntityId {
851 kind: EntityType::Verifier as i32,
852 id: key.to_string(),
853 }),
854 data_result: match response {
855 Ok(response) => Some(DataResult::Data(response.into_inner())),
856 Err(e) => Some(DataResult::Error(e.to_string())),
857 },
858 }
859 }
860 }),
861 )
862 .await;
863
864 let mut entities_comp_data = operator_comp_data;
866 entities_comp_data.extend(verifier_comp_data);
867
868 let aggregator_comp_data = EntityDataWithId {
870 entity_id: Some(RPCEntityId {
871 kind: EntityType::Aggregator as i32,
872 id: "Aggregator".to_string(),
873 }),
874 data_result: {
875 let compatibility_params: Result<CompatibilityParamsRpc, eyre::Report> =
876 self.get_compatibility_params()?.try_into();
877 match compatibility_params {
878 Ok(compatibility_params) => Some(DataResult::Data(compatibility_params)),
879 Err(e) => Some(DataResult::Error(e.to_string())),
880 }
881 },
882 };
883
884 entities_comp_data.push(aggregator_comp_data);
885
886 Self::add_unreachable_entity_errors(
888 &mut entities_comp_data,
889 &operator_keys,
890 &verifier_keys,
891 |entity_type, id, error_msg| EntityDataWithId {
892 entity_id: Some(RPCEntityId {
893 kind: entity_type as i32,
894 id,
895 }),
896 data_result: Some(DataResult::Error(error_msg)),
897 },
898 );
899
900 Ok(entities_comp_data)
901 }
902
903 pub async fn check_compatibility_with_actors(
906 &self,
907 verifiers_included: bool,
908 operators_included: bool,
909 ) -> Result<(), BridgeError> {
910 let mut other_errors = Vec::new();
911 let mut actors_compat_params = Vec::new();
912
913 if operators_included {
914 let operator_keys = self.fetch_operator_keys().await?;
915 for (operator_id, operator_client) in operator_keys
916 .into_iter()
917 .map(OperatorId)
918 .zip(self.operator_clients.iter())
919 {
920 let mut operator_client = operator_client.clone();
921
922 let res = {
923 let compatibility_params: CompatibilityParams = operator_client
924 .get_compatibility_params(Empty {})
925 .await?
926 .into_inner()
927 .try_into()?;
928 actors_compat_params.push((operator_id.to_string(), compatibility_params));
929 Ok::<_, BridgeError>(())
930 };
931 if let Err(e) = res {
932 other_errors.push(format!(
933 "{operator_id} error while retrieving compatibility params: {e}",
934 ));
935 }
936 }
937 }
938
939 if verifiers_included {
940 let verifier_keys = self.fetch_verifier_keys().await?;
941 for (verifier_id, verifier_client) in verifier_keys
942 .into_iter()
943 .map(VerifierId)
944 .zip(self.verifier_clients.iter())
945 {
946 let mut verifier_client = verifier_client.clone();
947
948 let res = {
949 let compatibility_params: CompatibilityParams = verifier_client
950 .get_compatibility_params(Empty {})
951 .await?
952 .into_inner()
953 .try_into()?;
954 actors_compat_params.push((verifier_id.to_string(), compatibility_params));
955 Ok::<_, BridgeError>(())
956 };
957 if let Err(e) = res {
958 other_errors.push(format!(
959 "{verifier_id} error while retrieving compatibility params: {e}",
960 ));
961 }
962 }
963 }
964
965 if let Err(e) = self.is_compatible(actors_compat_params) {
967 other_errors.push(format!("Clementine not compatible with some actors: {e}"));
968 }
969 if !other_errors.is_empty() {
970 return Err(eyre::eyre!(other_errors.join("; ")).into());
971 }
972
973 Ok(())
974 }
975}
976
977#[derive(Debug)]
979pub struct AggregatorServer {
980 pub aggregator: Aggregator,
981 background_tasks: crate::task::manager::BackgroundTaskManager,
982}
983
984impl AggregatorServer {
985 pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
986 let aggregator = Aggregator::new(config.clone()).await?;
987 let background_tasks = crate::task::manager::BackgroundTaskManager::default();
988
989 Ok(Self {
990 aggregator,
991 background_tasks,
992 })
993 }
994
995 pub async fn start_background_tasks(&self) -> Result<(), BridgeError> {
998 self.background_tasks
1000 .ensure_task_looping(
1001 crate::task::aggregator_metric_publisher::AggregatorMetricPublisher::new(
1002 self.aggregator.clone(),
1003 )
1004 .await?
1005 .with_delay(AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY),
1006 )
1007 .await;
1008
1009 tracing::info!("Aggregator metric publisher task started");
1010
1011 Ok(())
1012 }
1013
1014 pub async fn shutdown(&mut self) {
1015 self.background_tasks.graceful_shutdown().await;
1016 }
1017}
1018
1019impl Deref for AggregatorServer {
1020 type Target = Aggregator;
1021
1022 fn deref(&self) -> &Self::Target {
1023 &self.aggregator
1024 }
1025}