1use std::collections::HashSet;
2use std::ops::Deref;
3use std::sync::Arc;
4
5use crate::compatibility::{ActorWithConfig, CompatibilityParams};
6use crate::constants::{
7 ENTITY_COMP_DATA_POLL_TIMEOUT, ENTITY_STATUS_POLL_TIMEOUT, OPERATOR_GET_KEYS_TIMEOUT,
8 PUBLIC_KEY_COLLECTION_TIMEOUT, RESTART_BACKGROUND_TASKS_TIMEOUT, VERIFIER_SEND_KEYS_TIMEOUT,
9};
10use crate::deposit::DepositData;
11use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
12use crate::rpc::clementine::entity_data_with_id::DataResult;
13use crate::rpc::clementine::entity_status_with_id::StatusResult;
14use crate::rpc::clementine::{
15 self, CompatibilityParamsRpc, DepositParams, Empty, EntityStatusWithId, EntityType,
16 OperatorKeysWithDeposit,
17};
18use crate::rpc::clementine::{EntityDataWithId, EntityId as RPCEntityId};
19use crate::task::aggregator_metric_publisher::AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY;
20use crate::task::TaskExt;
21#[cfg(feature = "automation")]
22use crate::tx_sender::TxSenderClient;
23use crate::utils::{
24 flatten_join_named_results, join_all_partition_results, timed_request, timed_try_join_all,
25};
26use crate::{
27 config::BridgeConfig,
28 database::Database,
29 rpc::{
30 self,
31 clementine::{
32 clementine_operator_client::ClementineOperatorClient,
33 clementine_verifier_client::ClementineVerifierClient,
34 },
35 },
36};
37use bitcoin::secp256k1::PublicKey;
38use bitcoin::XOnlyPublicKey;
39use clementine_errors::BridgeError;
40use eyre::Context;
41use futures::future::join_all;
42use std::future::Future;
43use std::hash::Hash as StdHash;
44use tokio::sync::RwLock;
45use tonic::{Request, Status};
46use tracing::{debug_span, Instrument};
47
48#[derive(Debug, Clone)]
57pub struct Aggregator {
58 pub(crate) rpc: ExtendedBitcoinRpc,
59 pub(crate) db: Database,
60 pub(crate) config: BridgeConfig,
61 #[cfg(feature = "automation")]
62 pub(crate) tx_sender: TxSenderClient,
63 operator_clients: Vec<ClementineOperatorClient<tonic::transport::Channel>>,
64 verifier_clients: Vec<ClementineVerifierClient<tonic::transport::Channel>>,
65 verifier_keys: Arc<RwLock<Vec<Option<PublicKey>>>>,
66 operator_keys: Arc<RwLock<Vec<Option<XOnlyPublicKey>>>>,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
70pub enum EntityId {
71 Verifier(VerifierId),
72 Operator(OperatorId),
73 Aggregator,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum CompatibilityCheckScope {
79 VerifiersOnly,
81 OperatorsOnly,
83 Both,
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
89pub struct VerifierId(pub PublicKey);
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
93pub struct OperatorId(pub XOnlyPublicKey);
94
95impl std::fmt::Display for EntityId {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 match self {
98 EntityId::Aggregator => write!(f, "Aggregator"),
99 EntityId::Verifier(id) => write!(f, "{id}"),
100 EntityId::Operator(id) => write!(f, "{id}"),
101 }
102 }
103}
104
105impl std::fmt::Display for VerifierId {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 write!(f, "Verifier({})", &self.0.to_string()[..10])
108 }
109}
110
111impl std::fmt::Display for OperatorId {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 write!(f, "Operator({})", &self.0.to_string()[..10])
114 }
115}
116
117#[derive(Debug, Clone)]
119pub struct ParticipatingVerifiers(
120 pub Vec<(
121 ClementineVerifierClient<tonic::transport::Channel>,
122 VerifierId,
123 )>,
124);
125
126impl ParticipatingVerifiers {
127 pub fn new(
128 verifiers: Vec<(
129 ClementineVerifierClient<tonic::transport::Channel>,
130 VerifierId,
131 )>,
132 ) -> Self {
133 Self(verifiers)
134 }
135
136 pub fn clients(&self) -> Vec<ClementineVerifierClient<tonic::transport::Channel>> {
137 self.0.iter().map(|(client, _)| client.clone()).collect()
138 }
139
140 pub fn ids(&self) -> Vec<VerifierId> {
141 self.0.iter().map(|(_, id)| *id).collect()
142 }
143}
144
145#[derive(Debug, Clone)]
147pub struct ParticipatingOperators(
148 pub Vec<(
149 ClementineOperatorClient<tonic::transport::Channel>,
150 OperatorId,
151 )>,
152);
153
154impl ParticipatingOperators {
155 pub fn new(
156 operators: Vec<(
157 ClementineOperatorClient<tonic::transport::Channel>,
158 OperatorId,
159 )>,
160 ) -> Self {
161 Self(operators)
162 }
163
164 pub fn clients(&self) -> Vec<ClementineOperatorClient<tonic::transport::Channel>> {
165 self.0.iter().map(|(client, _)| client.clone()).collect()
166 }
167
168 pub fn ids(&self) -> Vec<OperatorId> {
169 self.0.iter().map(|(_, id)| *id).collect()
170 }
171}
172
173impl Aggregator {
174 pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
175 let db = Database::new(&config).await?;
176
177 let rpc = ExtendedBitcoinRpc::connect(
178 config.bitcoin_rpc_url.clone(),
179 config.bitcoin_rpc_user.clone(),
180 config.bitcoin_rpc_password.clone(),
181 None,
182 )
183 .await?;
184
185 let verifier_endpoints =
186 config
187 .verifier_endpoints
188 .clone()
189 .ok_or(BridgeError::ConfigError(
190 "No verifier endpoints provided in config".into(),
191 ))?;
192
193 let operator_endpoints =
194 config
195 .operator_endpoints
196 .clone()
197 .ok_or(BridgeError::ConfigError(
198 "No operator endpoints provided in config".into(),
199 ))?;
200
201 let verifier_clients = rpc::get_clients(
203 verifier_endpoints,
204 crate::rpc::verifier_client_builder(&config),
205 &config,
206 true,
207 )
208 .await?;
209
210 let operator_clients = rpc::get_clients(
212 operator_endpoints,
213 crate::rpc::operator_client_builder(&config),
214 &config,
215 true,
216 )
217 .await?;
218
219 #[cfg(feature = "automation")]
220 let tx_sender =
221 TxSenderClient::new(clementine_tx_sender::TxSenderDb::from_pool(db.get_pool()));
222
223 tracing::info!(
224 "Aggregator created with {} verifiers and {} operators",
225 verifier_clients.len(),
226 operator_clients.len(),
227 );
228
229 let operator_keys = Arc::new(RwLock::new(vec![None; operator_clients.len()]));
230 let verifier_keys = Arc::new(RwLock::new(vec![None; verifier_clients.len()]));
231
232 Ok(Aggregator {
233 rpc,
234 db,
235 config,
236 #[cfg(feature = "automation")]
237 tx_sender,
238 verifier_clients,
239 operator_clients,
240 verifier_keys,
241 operator_keys,
242 })
243 }
244
245 pub fn get_verifier_clients(&self) -> &[ClementineVerifierClient<tonic::transport::Channel>] {
246 &self.verifier_clients
247 }
248
249 async fn fetch_pubkeys_from_entities<T, C, F, Fut>(
251 &self,
252 clients: &[C],
253 keys_storage: &RwLock<Vec<Option<T>>>,
254 pubkey_fetcher: F,
255 key_type_name: &str,
256 ) -> Result<Vec<T>, BridgeError>
257 where
258 T: Clone + Send + Sync + Eq + StdHash + std::fmt::Debug,
259 C: Clone + Send + Sync,
260 F: Fn(C) -> Fut + Send + Sync,
261 Fut: Future<Output = Result<T, BridgeError>> + Send,
262 {
263 let all_collected = {
265 let keys = keys_storage.read().await;
266 keys.iter().all(|key| key.is_some())
267 };
268
269 if !all_collected {
270 let mut keys = keys_storage.write().await;
272
273 if keys.len() != clients.len() {
275 return Err(eyre::eyre!(
276 "Keys storage length does not match clients length, should not happen, keys length: {}, clients length: {}",
277 keys.len(),
278 clients.len()
279 )
280 .into());
281 }
282
283 let key_collection_futures = clients
284 .iter()
285 .zip(keys.iter().enumerate())
286 .filter_map(|(client, (idx, key))| {
287 if key.is_none() {
288 Some((idx, pubkey_fetcher(client.clone())))
289 } else {
290 None
291 }
292 })
293 .map(|(idx, fut)| async move { (idx, fut.await) });
294
295 let collected_keys = join_all(key_collection_futures).await;
296 let mut missing_keys = Vec::new();
297
298 for (idx, new_key) in collected_keys {
300 match new_key {
301 Ok(new_key) => keys[idx] = Some(new_key),
302 Err(e) => {
303 tracing::debug!(
304 "Failed to collect {} {} (order in config) key: {}",
305 key_type_name,
306 idx,
307 e
308 );
309 missing_keys.push(idx);
310 }
311 }
312 }
313
314 let non_none_keys: Vec<_> = keys.iter().filter_map(|key| key.as_ref()).collect();
316 let unique_keys: HashSet<_> = non_none_keys.iter().cloned().collect();
317
318 if unique_keys.len() != non_none_keys.len() {
319 let reason = format!("{key_type_name} keys are not unique: {keys:?}");
320 for key in keys.iter_mut() {
322 *key = None;
323 }
324 return Err(eyre::eyre!(reason).into());
325 }
326
327 if keys.iter().any(|key| key.is_none()) {
329 return Err(eyre::eyre!(
330 "Not all {} keys were able to be collected, missing keys at indices: {:?}",
331 key_type_name,
332 missing_keys
333 )
334 .into());
335 }
336 }
337
338 Ok(keys_storage
340 .read()
341 .await
342 .iter()
343 .map(|key| key.clone().expect("should all be collected"))
344 .collect())
345 }
346
347 pub async fn fetch_verifier_keys(&self) -> Result<Vec<PublicKey>, BridgeError> {
350 self.fetch_pubkeys_from_entities(
351 &self.verifier_clients,
352 &self.verifier_keys,
353 |mut client| async move {
354 let mut request = Request::new(Empty {});
355 request.set_timeout(PUBLIC_KEY_COLLECTION_TIMEOUT);
356 let verifier_params = client.get_params(request).await?.into_inner();
357 let public_key = PublicKey::from_slice(&verifier_params.public_key)
358 .map_err(|e| eyre::eyre!("Failed to parse verifier public key: {}", e))?;
359 Ok::<_, BridgeError>(public_key)
360 },
361 "verifier",
362 )
363 .await
364 }
365
366 pub async fn fetch_operator_keys(&self) -> Result<Vec<XOnlyPublicKey>, BridgeError> {
369 self.fetch_pubkeys_from_entities(
370 &self.operator_clients,
371 &self.operator_keys,
372 |mut client| async move {
373 let mut request = Request::new(Empty {});
374 request.set_timeout(PUBLIC_KEY_COLLECTION_TIMEOUT);
375 let operator_xonly_pk: XOnlyPublicKey = client
376 .get_x_only_public_key(request)
377 .await?
378 .into_inner()
379 .try_into()?;
380 Ok::<_, BridgeError>(operator_xonly_pk)
381 },
382 "operator",
383 )
384 .await
385 }
386
387 pub fn get_operator_clients(&self) -> &[ClementineOperatorClient<tonic::transport::Channel>] {
388 &self.operator_clients
389 }
390
391 pub async fn collect_and_distribute_keys(
395 &self,
396 deposit_params: &DepositParams,
397 ) -> Result<(), BridgeError> {
398 tracing::info!("Starting collect_and_distribute_keys");
399
400 let start_time = std::time::Instant::now();
401
402 let deposit_data: DepositData = deposit_params.clone().try_into()?;
403
404 let (operator_keys_tx, operator_keys_rx) =
406 tokio::sync::broadcast::channel::<clementine::OperatorKeysWithDeposit>(
407 deposit_data.get_num_operators() * deposit_data.get_num_verifiers(),
408 );
409 let operator_rx_handles = (0..deposit_data.get_num_verifiers())
410 .map(|_| operator_keys_rx.resubscribe())
411 .collect::<Vec<_>>();
412
413 let operators = self.get_participating_operators(&deposit_data).await?;
414 let operator_clients = operators.clients();
415
416 let operator_xonly_pks = deposit_data.get_operators();
417 let deposit = deposit_params.clone();
418
419 tracing::info!("Starting operator key collection");
420 #[cfg(test)]
421 let timeout_params = self.config.test_params.timeout_params;
422 #[allow(clippy::unused_enumerate_index)]
423 let get_operators_keys_handle = tokio::spawn(timed_try_join_all(
424 OPERATOR_GET_KEYS_TIMEOUT,
425 "Operator key collection",
426 Some(operators.ids()),
427 operator_clients
428 .into_iter()
429 .zip(operator_xonly_pks.into_iter())
430 .enumerate()
431 .map(move |(_idx, (mut operator_client, operator_xonly_pk))| {
432 let deposit_params = deposit.clone();
433 let tx = operator_keys_tx.clone();
434 async move {
435 #[cfg(test)]
436 timeout_params
437 .hook_timeout_key_collection_operator(_idx)
438 .await;
439
440 let operator_keys = operator_client
441 .get_deposit_keys(deposit_params.clone())
442 .instrument(
443 debug_span!("get_deposit_keys", id=%OperatorId(operator_xonly_pk)),
444 )
445 .await
446 .wrap_err(Status::internal(format!(
447 "Operator {operator_xonly_pk} key retrieval failed"
448 )))?
449 .into_inner();
450
451 let _ = tx.send(OperatorKeysWithDeposit {
459 deposit_params: Some(deposit_params),
460 operator_keys: Some(operator_keys),
461 operator_xonly_pk: operator_xonly_pk.serialize().to_vec(),
462 });
463
464 Ok(())
465 }
466 }),
467 ));
468
469 tracing::info!("Starting operator key distribution to verifiers");
470 let verifiers = self.get_participating_verifiers(&deposit_data).await?;
471
472 let verifier_clients = verifiers.clients();
473 let num_operators = deposit_data.get_num_operators();
474
475 let verifier_ids = verifiers.ids();
476
477 #[cfg(test)]
478 let timeout_params = self.config.test_params.timeout_params;
479 #[allow(clippy::unused_enumerate_index)]
480 let distribute_operators_keys_handle = tokio::spawn(timed_try_join_all(
481 VERIFIER_SEND_KEYS_TIMEOUT,
482 "Verifier key distribution",
483 Some(verifier_ids.clone()),
484 verifier_clients
485 .into_iter()
486 .zip(operator_rx_handles)
487 .zip(verifier_ids)
488 .enumerate()
489 .map(
490 move |(_idx, ((mut verifier, mut rx), verifier_id))| async move {
491 #[cfg(test)]
492 timeout_params
493 .hook_timeout_key_distribution_verifier(_idx)
494 .await;
495
496 let mut received_keys = std::collections::HashSet::new();
498 while received_keys.len() < num_operators {
499 tracing::debug!(
500 "Waiting for operator key (received {}/{})",
501 received_keys.len(),
502 num_operators
503 );
504
505 let operator_keys = rx
507 .recv()
508 .instrument(debug_span!("operator_keys_recv"))
509 .await
510 .wrap_err(Status::internal(
511 "Operator broadcast channels closed before all keys were received",
512 ))?;
513
514 let operator_xonly_pk = operator_keys.operator_xonly_pk.clone();
515
516 if !received_keys.insert(operator_xonly_pk.clone()) {
517 continue;
518 }
519
520 timed_request(
521 VERIFIER_SEND_KEYS_TIMEOUT,
522 &format!("Setting operator keys for {verifier_id}"),
523 async {
524 Ok(verifier
525 .set_operator_keys(operator_keys)
526 .await
527 .wrap_err_with(|| {
528 Status::internal(format!(
529 "Failed to set operator keys for {verifier_id}",
530 ))
531 }))
532 },
533 )
534 .await??;
535 }
536 Ok::<_, BridgeError>(())
537 },
538 ),
539 ));
540
541 let (get_operators_keys_result, distribute_operators_keys_result) =
543 tokio::join!(get_operators_keys_handle, distribute_operators_keys_handle);
544
545 flatten_join_named_results([
546 ("get_operators_keys", get_operators_keys_result),
547 (
548 "distribute_operators_keys",
549 distribute_operators_keys_result,
550 ),
551 ])?;
552
553 tracing::info!(
554 "collect_and_distribute_keys completed in {:?}",
555 start_time.elapsed()
556 );
557
558 Ok(())
559 }
560
561 pub async fn get_participating_verifiers(
563 &self,
564 deposit_data: &DepositData,
565 ) -> Result<ParticipatingVerifiers, BridgeError> {
566 let verifier_keys = self.fetch_verifier_keys().await?;
567 let mut participating_verifiers = Vec::new();
568
569 let verifiers = deposit_data.get_verifiers();
570
571 for verifier_pk in verifiers {
572 if let Some(pos) = verifier_keys.iter().position(|key| key == &verifier_pk) {
573 participating_verifiers
574 .push((self.verifier_clients[pos].clone(), VerifierId(verifier_pk)));
575 } else {
576 tracing::error!(
577 "Verifier public key not found. Deposit data verifier keys: {:?}, self verifier keys: {:?}",
578 deposit_data.get_verifiers(),
579 self.verifier_keys
580 );
581 return Err(BridgeError::VerifierNotFound(verifier_pk));
582 }
583 }
584
585 Ok(ParticipatingVerifiers::new(participating_verifiers))
586 }
587
588 pub async fn get_participating_operators(
590 &self,
591 deposit_data: &DepositData,
592 ) -> Result<ParticipatingOperators, BridgeError> {
593 let operator_keys = self.fetch_operator_keys().await?;
594 let mut participating_operators = Vec::new();
595
596 let operators = deposit_data.get_operators();
597
598 for operator_pk in operators {
599 if let Some(pos) = operator_keys.iter().position(|key| key == &operator_pk) {
600 participating_operators
601 .push((self.operator_clients[pos].clone(), OperatorId(operator_pk)));
602 } else {
603 return Err(BridgeError::OperatorNotFound(operator_pk));
604 }
605 }
606
607 Ok(ParticipatingOperators::new(participating_operators))
608 }
609
610 async fn fetch_all_entity_keys(&self) -> (Vec<Option<XOnlyPublicKey>>, Vec<Option<PublicKey>>) {
613 let _ = self.fetch_operator_keys().await;
615 let _ = self.fetch_verifier_keys().await;
616
617 let operator_keys = self.operator_keys.read().await.clone();
618 let verifier_keys = self.verifier_keys.read().await.clone();
619
620 (operator_keys, verifier_keys)
621 }
622
623 fn add_unreachable_entity_errors<T, F>(
625 results: &mut Vec<T>,
626 operator_keys: &[Option<XOnlyPublicKey>],
627 verifier_keys: &[Option<PublicKey>],
628 error_constructor: F,
629 ) where
630 F: Fn(EntityType, String, String) -> T,
631 {
632 for (index, key) in operator_keys.iter().enumerate() {
633 if key.is_none() {
634 results.push(error_constructor(
635 EntityType::Operator,
636 format!("Index {index} in config (0-based)"),
637 "Operator key was not able to be collected".to_string(),
638 ));
639 }
640 }
641 for (index, key) in verifier_keys.iter().enumerate() {
642 if key.is_none() {
643 results.push(error_constructor(
644 EntityType::Verifier,
645 format!("Index {index} in config (0-based)"),
646 "Verifier key was not able to be collected".to_string(),
647 ));
648 }
649 }
650 }
651
652 pub async fn get_entity_statuses(
655 &self,
656 restart_tasks: bool,
657 ) -> Result<Vec<EntityStatusWithId>, BridgeError> {
658 tracing::debug!("Getting entities status");
659
660 let operator_clients = self.get_operator_clients();
661 let verifier_clients = self.get_verifier_clients();
662 tracing::debug!("Operator clients: {:?}", operator_clients.len());
663
664 let (operator_keys, verifier_keys) = self.fetch_all_entity_keys().await;
665
666 let operator_status = join_all(
668 operator_clients
669 .iter()
670 .zip(operator_keys.iter())
671 .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
672 .map(|(client, key)| {
673 let mut client = client.clone();
674 let key = *key;
675 async move {
676 tracing::debug!("Getting operator status for {:?}", key);
677 let mut request = Request::new(Empty {});
678 request.set_timeout(ENTITY_STATUS_POLL_TIMEOUT);
679 let response = client.get_current_status(request).await;
680
681 EntityStatusWithId {
682 entity_id: Some(RPCEntityId {
683 kind: EntityType::Operator as i32,
684 id: key.to_string(),
685 }),
686 status_result: match response {
687 Ok(response) => Some(StatusResult::Status(response.into_inner())),
688 Err(e) => Some(StatusResult::Err(clementine::EntityError {
689 error: e.to_string(),
690 })),
691 },
692 }
693 }
694 }),
695 )
696 .await;
697
698 let verifier_status = join_all(
700 verifier_clients
701 .iter()
702 .zip(verifier_keys.iter())
703 .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
704 .map(|(client, key)| {
705 let mut client = client.clone();
706 let key = *key;
707 async move {
708 tracing::debug!("Getting verifier status for {:?}", key);
709 let mut request = Request::new(Empty {});
710 request.set_timeout(ENTITY_STATUS_POLL_TIMEOUT);
711 let response = client.get_current_status(request).await;
712
713 EntityStatusWithId {
714 entity_id: Some(RPCEntityId {
715 kind: EntityType::Verifier as i32,
716 id: key.to_string(),
717 }),
718 status_result: match response {
719 Ok(response) => Some(StatusResult::Status(response.into_inner())),
720 Err(e) => Some(StatusResult::Err(clementine::EntityError {
721 error: e.to_string(),
722 })),
723 },
724 }
725 }
726 }),
727 )
728 .await;
729
730 let mut entity_statuses = operator_status;
732 entity_statuses.extend(verifier_status);
733
734 if restart_tasks {
736 let operator_tasks = operator_clients
737 .iter()
738 .zip(operator_keys.iter())
739 .filter_map(|(client, key)| key.map(|key| (client, key)))
740 .map(|(client, key)| {
741 let mut client = client.clone();
742 async move {
743 let mut request = Request::new(Empty {});
744 request.set_timeout(RESTART_BACKGROUND_TASKS_TIMEOUT);
745 client
746 .restart_background_tasks(Request::new(Empty {}))
747 .await
748 .wrap_err_with(|| {
749 Status::internal(format!(
750 "Failed to restart background tasks for operator {key}"
751 ))
752 })
753 }
754 });
755
756 let verifier_tasks = verifier_clients
757 .iter()
758 .zip(verifier_keys.iter())
759 .filter_map(|(client, key)| key.map(|key| (client, key)))
760 .map(|(client, key)| {
761 let mut client = client.clone();
762 async move {
763 let mut request = Request::new(Empty {});
764 request.set_timeout(RESTART_BACKGROUND_TASKS_TIMEOUT);
765 client
766 .restart_background_tasks(Request::new(Empty {}))
767 .await
768 .wrap_err_with(|| {
769 Status::internal(format!(
770 "Failed to restart background tasks for verifier {key}"
771 ))
772 })
773 }
774 });
775
776 let (operator_results, verifier_results) = futures::join!(
777 futures::future::join_all(operator_tasks),
778 futures::future::join_all(verifier_tasks)
779 );
780 for result in operator_results
782 .into_iter()
783 .chain(verifier_results.into_iter())
784 {
785 if let Err(e) = result {
786 tracing::error!("{e:?}");
787 }
788 }
789 }
790
791 Self::add_unreachable_entity_errors(
793 &mut entity_statuses,
794 &operator_keys,
795 &verifier_keys,
796 |entity_type, id, error_msg| EntityStatusWithId {
797 entity_id: Some(RPCEntityId {
798 kind: entity_type as i32,
799 id,
800 }),
801 status_result: Some(StatusResult::Err(clementine::EntityError {
802 error: error_msg,
803 })),
804 },
805 );
806
807 Ok(entity_statuses)
808 }
809
810 pub async fn get_compatibility_data_from_entities(
811 &self,
812 ) -> Result<Vec<EntityDataWithId>, BridgeError> {
813 let operator_clients = self.get_operator_clients();
814 let verifier_clients = self.get_verifier_clients();
815
816 let (operator_keys, verifier_keys) = self.fetch_all_entity_keys().await;
817
818 let operator_comp_data = join_all(
820 operator_clients
821 .iter()
822 .zip(operator_keys.iter())
823 .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
824 .map(|(client, key)| {
825 let mut client = client.clone();
826 let key = *key;
827 async move {
828 tracing::debug!("Getting operator compatibility data for {:?}", key);
829 let mut request = Request::new(Empty {});
830 request.set_timeout(ENTITY_COMP_DATA_POLL_TIMEOUT);
831 let response = client.get_compatibility_params(request).await;
832
833 EntityDataWithId {
834 entity_id: Some(RPCEntityId {
835 kind: EntityType::Operator as i32,
836 id: key.to_string(),
837 }),
838 data_result: match response {
839 Ok(response) => Some(DataResult::Data(response.into_inner())),
840 Err(e) => Some(DataResult::Error(e.to_string())),
841 },
842 }
843 }
844 }),
845 )
846 .await;
847
848 let verifier_comp_data = join_all(
850 verifier_clients
851 .iter()
852 .zip(verifier_keys.iter())
853 .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
854 .map(|(client, key)| {
855 let mut client = client.clone();
856 let key = *key;
857 async move {
858 tracing::debug!("Getting verifier compatibility data for {:?}", key);
859 let mut request = Request::new(Empty {});
860 request.set_timeout(ENTITY_COMP_DATA_POLL_TIMEOUT);
861 let response = client.get_compatibility_params(request).await;
862
863 EntityDataWithId {
864 entity_id: Some(RPCEntityId {
865 kind: EntityType::Verifier as i32,
866 id: key.to_string(),
867 }),
868 data_result: match response {
869 Ok(response) => Some(DataResult::Data(response.into_inner())),
870 Err(e) => Some(DataResult::Error(e.to_string())),
871 },
872 }
873 }
874 }),
875 )
876 .await;
877
878 let mut entities_comp_data = operator_comp_data;
880 entities_comp_data.extend(verifier_comp_data);
881
882 let aggregator_comp_data = EntityDataWithId {
884 entity_id: Some(RPCEntityId {
885 kind: EntityType::Aggregator as i32,
886 id: "Aggregator".to_string(),
887 }),
888 data_result: {
889 let compatibility_params: Result<CompatibilityParamsRpc, eyre::Report> =
890 self.get_compatibility_params()?.try_into();
891 match compatibility_params {
892 Ok(compatibility_params) => Some(DataResult::Data(compatibility_params)),
893 Err(e) => Some(DataResult::Error(e.to_string())),
894 }
895 },
896 };
897
898 entities_comp_data.push(aggregator_comp_data);
899
900 Self::add_unreachable_entity_errors(
902 &mut entities_comp_data,
903 &operator_keys,
904 &verifier_keys,
905 |entity_type, id, error_msg| EntityDataWithId {
906 entity_id: Some(RPCEntityId {
907 kind: entity_type as i32,
908 id,
909 }),
910 data_result: Some(DataResult::Error(error_msg)),
911 },
912 );
913
914 Ok(entities_comp_data)
915 }
916
917 pub async fn check_compatibility_with_actors(
920 &self,
921 scope: CompatibilityCheckScope,
922 ) -> Result<(), BridgeError> {
923 let mut errors = Vec::new();
924 let mut operator_futures = Vec::new();
925 let mut verifier_futures = Vec::new();
926
927 let operators_included = matches!(
928 scope,
929 CompatibilityCheckScope::OperatorsOnly | CompatibilityCheckScope::Both
930 );
931 let verifiers_included = matches!(
932 scope,
933 CompatibilityCheckScope::VerifiersOnly | CompatibilityCheckScope::Both
934 );
935
936 if operators_included {
937 let operator_keys = self.fetch_operator_keys().await?;
938 for (operator_id, operator_client) in operator_keys
939 .into_iter()
940 .map(OperatorId)
941 .zip(self.operator_clients.iter())
942 {
943 let mut operator_client = operator_client.clone();
944 let operator_id_str = operator_id.to_string();
945
946 operator_futures.push(async move {
947 let compatibility_params: CompatibilityParams = operator_client
948 .get_compatibility_params(Empty {})
949 .await
950 .wrap_err(format!(
951 "{operator_id_str} compatibility params retrieval failed"
952 ))?
953 .into_inner()
954 .try_into()
955 .wrap_err(format!(
956 "{operator_id_str} compatibility params conversion failed"
957 ))?;
958 Ok::<_, BridgeError>((operator_id_str, compatibility_params))
959 });
960 }
961 }
962
963 if verifiers_included {
964 let verifier_keys = self.fetch_verifier_keys().await?;
965 for (verifier_id, verifier_client) in verifier_keys
966 .into_iter()
967 .map(VerifierId)
968 .zip(self.verifier_clients.iter())
969 {
970 let mut verifier_client = verifier_client.clone();
971 let verifier_id_str = verifier_id.to_string();
972
973 verifier_futures.push(async move {
974 let compatibility_params: CompatibilityParams = verifier_client
975 .get_compatibility_params(Empty {})
976 .await
977 .wrap_err(format!(
978 "{verifier_id_str} compatibility params retrieval failed"
979 ))?
980 .into_inner()
981 .try_into()
982 .wrap_err(format!(
983 "{verifier_id_str} compatibility params conversion failed"
984 ))?;
985 Ok::<_, BridgeError>((verifier_id_str, compatibility_params))
986 });
987 }
988 }
989
990 let (operator_results, operator_err) = join_all_partition_results(operator_futures).await;
992 let (verifier_results, verifier_err) = join_all_partition_results(verifier_futures).await;
993
994 let mut actors_compat_params = Vec::new();
995 actors_compat_params.extend(operator_results);
996 actors_compat_params.extend(verifier_results);
997
998 if let Some(operator_err) = operator_err {
999 errors.push(format!(
1000 "Error while retrieving operator compatibility params: {operator_err}"
1001 ));
1002 }
1003 if let Some(verifier_err) = verifier_err {
1004 errors.push(format!(
1005 "Error while retrieving verifier compatibility params: {verifier_err}"
1006 ));
1007 }
1008
1009 if let Err(e) = self.is_compatible(actors_compat_params) {
1011 errors.push(format!("Clementine not compatible with some actors: {e}"));
1012 }
1013 if !errors.is_empty() {
1014 return Err(eyre::eyre!(errors.join("; ")).into());
1015 }
1016
1017 Ok(())
1018 }
1019}
1020
1021#[derive(Debug)]
1023pub struct AggregatorServer {
1024 pub aggregator: Aggregator,
1025 background_tasks: crate::task::manager::BackgroundTaskManager,
1026}
1027
1028impl AggregatorServer {
1029 pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
1030 let aggregator = Aggregator::new(config.clone()).await?;
1031 let background_tasks = crate::task::manager::BackgroundTaskManager::default();
1032
1033 Ok(Self {
1034 aggregator,
1035 background_tasks,
1036 })
1037 }
1038
1039 pub async fn start_background_tasks(&self) -> Result<(), BridgeError> {
1042 self.background_tasks
1044 .ensure_task_looping(
1045 crate::task::aggregator_metric_publisher::AggregatorMetricPublisher::new(
1046 self.aggregator.clone(),
1047 )
1048 .await?
1049 .with_delay(AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY),
1050 )
1051 .await;
1052
1053 tracing::info!("Aggregator metric publisher task started");
1054
1055 Ok(())
1056 }
1057}
1058
1059impl Deref for AggregatorServer {
1060 type Target = Aggregator;
1061
1062 fn deref(&self) -> &Self::Target {
1063 &self.aggregator
1064 }
1065}