clementine_core/
aggregator.rs

1use std::collections::HashSet;
2use std::ops::Deref;
3use std::sync::Arc;
4
5use crate::compatibility::{ActorWithConfig, CompatibilityParams};
6use crate::constants::{
7    ENTITY_COMP_DATA_POLL_TIMEOUT, ENTITY_STATUS_POLL_TIMEOUT, OPERATOR_GET_KEYS_TIMEOUT,
8    PUBLIC_KEY_COLLECTION_TIMEOUT, RESTART_BACKGROUND_TASKS_TIMEOUT, VERIFIER_SEND_KEYS_TIMEOUT,
9};
10use crate::deposit::DepositData;
11use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
12use crate::rpc::clementine::entity_data_with_id::DataResult;
13use crate::rpc::clementine::entity_status_with_id::StatusResult;
14use crate::rpc::clementine::{
15    self, CompatibilityParamsRpc, DepositParams, Empty, EntityStatusWithId, EntityType,
16    OperatorKeysWithDeposit,
17};
18use crate::rpc::clementine::{EntityDataWithId, EntityId as RPCEntityId};
19use crate::task::aggregator_metric_publisher::AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY;
20use crate::task::TaskExt;
21#[cfg(feature = "automation")]
22use crate::tx_sender::TxSenderClient;
23use crate::utils::{flatten_join_named_results, timed_request, timed_try_join_all};
24use crate::{
25    config::BridgeConfig,
26    database::Database,
27    errors::BridgeError,
28    rpc::{
29        self,
30        clementine::{
31            clementine_operator_client::ClementineOperatorClient,
32            clementine_verifier_client::ClementineVerifierClient,
33        },
34    },
35};
36use bitcoin::secp256k1::PublicKey;
37use bitcoin::XOnlyPublicKey;
38use eyre::Context;
39use futures::future::join_all;
40use std::future::Future;
41use std::hash::Hash as StdHash;
42use tokio::sync::RwLock;
43use tonic::{Request, Status};
44use tracing::{debug_span, Instrument};
45
46/// Aggregator struct.
47/// This struct is responsible for aggregating partial signatures from the verifiers.
48/// It will have in total 3 * num_operator + 1 aggregated nonces.
49/// \[0\] -> Aggregated nonce for the move transaction.
50/// [1..num_operator + 1] -> Aggregated nonces for the operator_takes transactions.
51/// [num_operator + 1..2 * num_operator + 1] -> Aggregated nonces for the slash_or_take transactions.
52/// [2 * num_operator + 1..3 * num_operator + 1] -> Aggregated nonces for the burn transactions.
53/// For now, we do not have the last bit.
54#[derive(Debug, Clone)]
55pub struct Aggregator {
56    pub(crate) rpc: ExtendedBitcoinRpc,
57    pub(crate) db: Database,
58    pub(crate) config: BridgeConfig,
59    #[cfg(feature = "automation")]
60    pub(crate) tx_sender: TxSenderClient,
61    operator_clients: Vec<ClementineOperatorClient<tonic::transport::Channel>>,
62    verifier_clients: Vec<ClementineVerifierClient<tonic::transport::Channel>>,
63    verifier_keys: Arc<RwLock<Vec<Option<PublicKey>>>>,
64    operator_keys: Arc<RwLock<Vec<Option<XOnlyPublicKey>>>>,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
68pub enum EntityId {
69    Verifier(VerifierId),
70    Operator(OperatorId),
71    Aggregator,
72}
73
74/// Wrapper struct that renders the verifier id in the logs.
75#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
76pub struct VerifierId(pub PublicKey);
77
78/// Wrapper struct that renders the operator id in the logs.
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
80pub struct OperatorId(pub XOnlyPublicKey);
81
82impl std::fmt::Display for EntityId {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        match self {
85            EntityId::Aggregator => write!(f, "Aggregator"),
86            EntityId::Verifier(id) => write!(f, "{id}"),
87            EntityId::Operator(id) => write!(f, "{id}"),
88        }
89    }
90}
91
92impl std::fmt::Display for VerifierId {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        write!(f, "Verifier({})", &self.0.to_string()[..10])
95    }
96}
97
98impl std::fmt::Display for OperatorId {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        write!(f, "Operator({})", &self.0.to_string()[..10])
101    }
102}
103
104/// Wrapper struct that matches verifier clients with their ids.
105#[derive(Debug, Clone)]
106pub struct ParticipatingVerifiers(
107    pub  Vec<(
108        ClementineVerifierClient<tonic::transport::Channel>,
109        VerifierId,
110    )>,
111);
112
113impl ParticipatingVerifiers {
114    pub fn new(
115        verifiers: Vec<(
116            ClementineVerifierClient<tonic::transport::Channel>,
117            VerifierId,
118        )>,
119    ) -> Self {
120        Self(verifiers)
121    }
122
123    pub fn clients(&self) -> Vec<ClementineVerifierClient<tonic::transport::Channel>> {
124        self.0.iter().map(|(client, _)| client.clone()).collect()
125    }
126
127    pub fn ids(&self) -> Vec<VerifierId> {
128        self.0.iter().map(|(_, id)| *id).collect()
129    }
130}
131
132/// Wrapper struct that matches operator clients with their ids.
133#[derive(Debug, Clone)]
134pub struct ParticipatingOperators(
135    pub  Vec<(
136        ClementineOperatorClient<tonic::transport::Channel>,
137        OperatorId,
138    )>,
139);
140
141impl ParticipatingOperators {
142    pub fn new(
143        operators: Vec<(
144            ClementineOperatorClient<tonic::transport::Channel>,
145            OperatorId,
146        )>,
147    ) -> Self {
148        Self(operators)
149    }
150
151    pub fn clients(&self) -> Vec<ClementineOperatorClient<tonic::transport::Channel>> {
152        self.0.iter().map(|(client, _)| client.clone()).collect()
153    }
154
155    pub fn ids(&self) -> Vec<OperatorId> {
156        self.0.iter().map(|(_, id)| *id).collect()
157    }
158}
159
160impl Aggregator {
161    pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
162        let db = Database::new(&config).await?;
163
164        let rpc = ExtendedBitcoinRpc::connect(
165            config.bitcoin_rpc_url.clone(),
166            config.bitcoin_rpc_user.clone(),
167            config.bitcoin_rpc_password.clone(),
168            None,
169        )
170        .await?;
171
172        let verifier_endpoints =
173            config
174                .verifier_endpoints
175                .clone()
176                .ok_or(BridgeError::ConfigError(
177                    "No verifier endpoints provided in config".into(),
178                ))?;
179
180        let operator_endpoints =
181            config
182                .operator_endpoints
183                .clone()
184                .ok_or(BridgeError::ConfigError(
185                    "No operator endpoints provided in config".into(),
186                ))?;
187
188        // Create clients to connect to all verifiers
189        let verifier_clients = rpc::get_clients(
190            verifier_endpoints,
191            crate::rpc::verifier_client_builder(&config),
192            &config,
193            true,
194        )
195        .await?;
196
197        // Create clients to connect to all operators
198        let operator_clients = rpc::get_clients(
199            operator_endpoints,
200            crate::rpc::operator_client_builder(&config),
201            &config,
202            true,
203        )
204        .await?;
205
206        #[cfg(feature = "automation")]
207        let tx_sender = TxSenderClient::new(db.clone(), "aggregator".to_string());
208
209        tracing::info!(
210            "Aggregator created with {} verifiers and {} operators",
211            verifier_clients.len(),
212            operator_clients.len(),
213        );
214
215        let operator_keys = Arc::new(RwLock::new(vec![None; operator_clients.len()]));
216        let verifier_keys = Arc::new(RwLock::new(vec![None; verifier_clients.len()]));
217
218        Ok(Aggregator {
219            rpc,
220            db,
221            config,
222            #[cfg(feature = "automation")]
223            tx_sender,
224            verifier_clients,
225            operator_clients,
226            verifier_keys,
227            operator_keys,
228        })
229    }
230
231    pub fn get_verifier_clients(&self) -> &[ClementineVerifierClient<tonic::transport::Channel>] {
232        &self.verifier_clients
233    }
234
235    /// Generic helper function to fetch keys from clients
236    async fn fetch_pubkeys_from_entities<T, C, F, Fut>(
237        &self,
238        clients: &[C],
239        keys_storage: &RwLock<Vec<Option<T>>>,
240        pubkey_fetcher: F,
241        key_type_name: &str,
242    ) -> Result<Vec<T>, BridgeError>
243    where
244        T: Clone + Send + Sync + Eq + StdHash + std::fmt::Debug,
245        C: Clone + Send + Sync,
246        F: Fn(C) -> Fut + Send + Sync,
247        Fut: Future<Output = Result<T, BridgeError>> + Send,
248    {
249        // Check if all keys are collected
250        let all_collected = {
251            let keys = keys_storage.read().await;
252            keys.iter().all(|key| key.is_some())
253        };
254
255        if !all_collected {
256            // get a write lock early, so that only one thread can try to collect keys
257            let mut keys = keys_storage.write().await;
258
259            // sanity check because we directly use indexes below
260            if keys.len() != clients.len() {
261                return Err(eyre::eyre!(
262                    "Keys storage length does not match clients length, should not happen, keys length: {}, clients length: {}",
263                    keys.len(),
264                    clients.len()
265                )
266                .into());
267            }
268
269            let key_collection_futures = clients
270                .iter()
271                .zip(keys.iter().enumerate())
272                .filter_map(|(client, (idx, key))| {
273                    if key.is_none() {
274                        Some((idx, pubkey_fetcher(client.clone())))
275                    } else {
276                        None
277                    }
278                })
279                .map(|(idx, fut)| async move { (idx, fut.await) });
280
281            let collected_keys = join_all(key_collection_futures).await;
282            let mut missing_keys = Vec::new();
283
284            // Fill in keys with the results of the futures
285            for (idx, new_key) in collected_keys {
286                match new_key {
287                    Ok(new_key) => keys[idx] = Some(new_key),
288                    Err(e) => {
289                        tracing::debug!(
290                            "Failed to collect {} {} (order in config) key: {}",
291                            key_type_name,
292                            idx,
293                            e
294                        );
295                        missing_keys.push(idx);
296                    }
297                }
298            }
299
300            // if keys are not unique, return an error if so
301            let non_none_keys: Vec<_> = keys.iter().filter_map(|key| key.as_ref()).collect();
302            let unique_keys: HashSet<_> = non_none_keys.iter().cloned().collect();
303
304            if unique_keys.len() != non_none_keys.len() {
305                let reason = format!("{key_type_name} keys are not unique: {keys:?}");
306                // reset all keys to None so that faulty keys are not used
307                for key in keys.iter_mut() {
308                    *key = None;
309                }
310                return Err(eyre::eyre!(reason).into());
311            }
312
313            // if not all keys were collected, return an error
314            if keys.iter().any(|key| key.is_none()) {
315                return Err(eyre::eyre!(
316                    "Not all {} keys were able to be collected, missing keys at indices: {:?}",
317                    key_type_name,
318                    missing_keys
319                )
320                .into());
321            }
322        }
323
324        // return all keys if they were all collected
325        Ok(keys_storage
326            .read()
327            .await
328            .iter()
329            .map(|key| key.clone().expect("should all be collected"))
330            .collect())
331    }
332
333    /// If all verifier keys are already collected, returns them.
334    /// Otherwise, it tries to collect them from the verifiers, saves them and returns them.
335    pub async fn fetch_verifier_keys(&self) -> Result<Vec<PublicKey>, BridgeError> {
336        self.fetch_pubkeys_from_entities(
337            &self.verifier_clients,
338            &self.verifier_keys,
339            |mut client| async move {
340                let mut request = Request::new(Empty {});
341                request.set_timeout(PUBLIC_KEY_COLLECTION_TIMEOUT);
342                let verifier_params = client.get_params(request).await?.into_inner();
343                let public_key = PublicKey::from_slice(&verifier_params.public_key)
344                    .map_err(|e| eyre::eyre!("Failed to parse verifier public key: {}", e))?;
345                Ok::<_, BridgeError>(public_key)
346            },
347            "verifier",
348        )
349        .await
350    }
351
352    /// If all operator keys are already collected, returns them.
353    /// Otherwise, it tries to collect them from the operators, saves them and returns them.
354    pub async fn fetch_operator_keys(&self) -> Result<Vec<XOnlyPublicKey>, BridgeError> {
355        self.fetch_pubkeys_from_entities(
356            &self.operator_clients,
357            &self.operator_keys,
358            |mut client| async move {
359                let mut request = Request::new(Empty {});
360                request.set_timeout(PUBLIC_KEY_COLLECTION_TIMEOUT);
361                let operator_xonly_pk: XOnlyPublicKey = client
362                    .get_x_only_public_key(request)
363                    .await?
364                    .into_inner()
365                    .try_into()?;
366                Ok::<_, BridgeError>(operator_xonly_pk)
367            },
368            "operator",
369        )
370        .await
371    }
372
373    pub fn get_operator_clients(&self) -> &[ClementineOperatorClient<tonic::transport::Channel>] {
374        &self.operator_clients
375    }
376
377    /// Collects and distributes keys to verifiers from operators and watchtowers for the new deposit
378    /// for operators: get bitvm assert winternitz public keys and watchtower challenge ack hashes
379    /// for watchtowers: get winternitz public keys for watchtower challenges
380    pub async fn collect_and_distribute_keys(
381        &self,
382        deposit_params: &DepositParams,
383    ) -> Result<(), BridgeError> {
384        tracing::info!("Starting collect_and_distribute_keys");
385
386        let start_time = std::time::Instant::now();
387
388        let deposit_data: DepositData = deposit_params.clone().try_into()?;
389
390        // Create channels with larger capacity to prevent blocking
391        let (operator_keys_tx, operator_keys_rx) =
392            tokio::sync::broadcast::channel::<clementine::OperatorKeysWithDeposit>(
393                deposit_data.get_num_operators() * deposit_data.get_num_verifiers(),
394            );
395        let operator_rx_handles = (0..deposit_data.get_num_verifiers())
396            .map(|_| operator_keys_rx.resubscribe())
397            .collect::<Vec<_>>();
398
399        let operators = self.get_participating_operators(&deposit_data).await?;
400        let operator_clients = operators.clients();
401
402        let operator_xonly_pks = deposit_data.get_operators();
403        let deposit = deposit_params.clone();
404
405        tracing::info!("Starting operator key collection");
406        #[cfg(test)]
407        let timeout_params = self.config.test_params.timeout_params;
408        #[allow(clippy::unused_enumerate_index)]
409        let get_operators_keys_handle = tokio::spawn(timed_try_join_all(
410            OPERATOR_GET_KEYS_TIMEOUT,
411            "Operator key collection",
412            Some(operators.ids()),
413            operator_clients
414                .into_iter()
415                .zip(operator_xonly_pks.into_iter())
416                .enumerate()
417                .map(move |(_idx, (mut operator_client, operator_xonly_pk))| {
418                    let deposit_params = deposit.clone();
419                    let tx = operator_keys_tx.clone();
420                    async move {
421                        #[cfg(test)]
422                        timeout_params
423                            .hook_timeout_key_collection_operator(_idx)
424                            .await;
425
426                        let operator_keys = operator_client
427                            .get_deposit_keys(deposit_params.clone())
428                            .instrument(
429                                debug_span!("get_deposit_keys", id=%OperatorId(operator_xonly_pk)),
430                            )
431                            .await
432                            .wrap_err(Status::internal(format!(
433                                "Operator {operator_xonly_pk} key retrieval failed"
434                            )))?
435                            .into_inner();
436
437                        // A send error means that all receivers are closed,
438                        // receivers only close if they have an error (while
439                        // loop condition)
440                        // We don't care about the result of the send, we
441                        // only care about the error on the other side.
442                        // Ignore this error, and let the other side's error
443                        // propagate.
444                        let _ = tx.send(OperatorKeysWithDeposit {
445                            deposit_params: Some(deposit_params),
446                            operator_keys: Some(operator_keys),
447                            operator_xonly_pk: operator_xonly_pk.serialize().to_vec(),
448                        });
449
450                        Ok(())
451                    }
452                }),
453        ));
454
455        tracing::info!("Starting operator key distribution to verifiers");
456        let verifiers = self.get_participating_verifiers(&deposit_data).await?;
457
458        let verifier_clients = verifiers.clients();
459        let num_operators = deposit_data.get_num_operators();
460
461        let verifier_ids = verifiers.ids();
462
463        #[cfg(test)]
464        let timeout_params = self.config.test_params.timeout_params;
465        #[allow(clippy::unused_enumerate_index)]
466        let distribute_operators_keys_handle = tokio::spawn(timed_try_join_all(
467            VERIFIER_SEND_KEYS_TIMEOUT,
468            "Verifier key distribution",
469            Some(verifier_ids.clone()),
470            verifier_clients
471                .into_iter()
472                .zip(operator_rx_handles)
473                .zip(verifier_ids)
474                .enumerate()
475                .map(
476                    move |(_idx, ((mut verifier, mut rx), verifier_id))| async move {
477                        #[cfg(test)]
478                        timeout_params
479                            .hook_timeout_key_distribution_verifier(_idx)
480                            .await;
481
482                        // Only wait for expected number of messages
483                        let mut received_keys = std::collections::HashSet::new();
484                        while received_keys.len() < num_operators {
485                            tracing::debug!(
486                                "Waiting for operator key (received {}/{})",
487                                received_keys.len(),
488                                num_operators
489                            );
490
491                            // This will not block forever because of the timeout on the join all.
492                            let operator_keys = rx
493                            .recv()
494                            .instrument(debug_span!("operator_keys_recv"))
495                            .await
496                            .wrap_err(Status::internal(
497                                "Operator broadcast channels closed before all keys were received",
498                            ))?;
499
500                            let operator_xonly_pk = operator_keys.operator_xonly_pk.clone();
501
502                            if !received_keys.insert(operator_xonly_pk.clone()) {
503                                continue;
504                            }
505
506                            timed_request(
507                                VERIFIER_SEND_KEYS_TIMEOUT,
508                                &format!("Setting operator keys for {verifier_id}"),
509                                async {
510                                    Ok(verifier
511                                        .set_operator_keys(operator_keys)
512                                        .await
513                                        .wrap_err_with(|| {
514                                            Status::internal(format!(
515                                                "Failed to set operator keys for {verifier_id}",
516                                            ))
517                                        }))
518                                },
519                            )
520                            .await??;
521                        }
522                        Ok::<_, BridgeError>(())
523                    },
524                ),
525        ));
526
527        // Wait for all tasks to complete
528        let (get_operators_keys_result, distribute_operators_keys_result) =
529            tokio::join!(get_operators_keys_handle, distribute_operators_keys_handle);
530
531        flatten_join_named_results([
532            ("get_operators_keys", get_operators_keys_result),
533            (
534                "distribute_operators_keys",
535                distribute_operators_keys_result,
536            ),
537        ])?;
538
539        tracing::info!(
540            "collect_and_distribute_keys completed in {:?}",
541            start_time.elapsed()
542        );
543
544        Ok(())
545    }
546
547    /// Returns a list of verifier clients that are participating in the deposit.
548    pub async fn get_participating_verifiers(
549        &self,
550        deposit_data: &DepositData,
551    ) -> Result<ParticipatingVerifiers, BridgeError> {
552        let verifier_keys = self.fetch_verifier_keys().await?;
553        let mut participating_verifiers = Vec::new();
554
555        let verifiers = deposit_data.get_verifiers();
556
557        for verifier_pk in verifiers {
558            if let Some(pos) = verifier_keys.iter().position(|key| key == &verifier_pk) {
559                participating_verifiers
560                    .push((self.verifier_clients[pos].clone(), VerifierId(verifier_pk)));
561            } else {
562                tracing::error!(
563                    "Verifier public key not found. Deposit data verifier keys: {:?}, self verifier keys: {:?}",
564                    deposit_data.get_verifiers(),
565                    self.verifier_keys
566                );
567                return Err(BridgeError::VerifierNotFound(verifier_pk));
568            }
569        }
570
571        Ok(ParticipatingVerifiers::new(participating_verifiers))
572    }
573
574    /// Returns a list of operator clients that are participating in the deposit.
575    pub async fn get_participating_operators(
576        &self,
577        deposit_data: &DepositData,
578    ) -> Result<ParticipatingOperators, BridgeError> {
579        let operator_keys = self.fetch_operator_keys().await?;
580        let mut participating_operators = Vec::new();
581
582        let operators = deposit_data.get_operators();
583
584        for operator_pk in operators {
585            if let Some(pos) = operator_keys.iter().position(|key| key == &operator_pk) {
586                participating_operators
587                    .push((self.operator_clients[pos].clone(), OperatorId(operator_pk)));
588            } else {
589                return Err(BridgeError::OperatorNotFound(operator_pk));
590            }
591        }
592
593        Ok(ParticipatingOperators::new(participating_operators))
594    }
595
596    /// Helper function to fetch keys for both operators and verifiers.
597    /// Returns (operator_keys, verifier_keys) without failing if some entities are unreachable.
598    async fn fetch_all_entity_keys(&self) -> (Vec<Option<XOnlyPublicKey>>, Vec<Option<PublicKey>>) {
599        // Try to reach all operators and verifiers to collect keys, but do not return err if some can't be reached
600        let _ = self.fetch_operator_keys().await;
601        let _ = self.fetch_verifier_keys().await;
602
603        let operator_keys = self.operator_keys.read().await.clone();
604        let verifier_keys = self.verifier_keys.read().await.clone();
605
606        (operator_keys, verifier_keys)
607    }
608
609    /// Helper function to add error entries for entities where keys couldn't be collected.
610    fn add_unreachable_entity_errors<T, F>(
611        results: &mut Vec<T>,
612        operator_keys: &[Option<XOnlyPublicKey>],
613        verifier_keys: &[Option<PublicKey>],
614        error_constructor: F,
615    ) where
616        F: Fn(EntityType, String, String) -> T,
617    {
618        for (index, key) in operator_keys.iter().enumerate() {
619            if key.is_none() {
620                results.push(error_constructor(
621                    EntityType::Operator,
622                    format!("Index {index} in config (0-based)"),
623                    "Operator key was not able to be collected".to_string(),
624                ));
625            }
626        }
627        for (index, key) in verifier_keys.iter().enumerate() {
628            if key.is_none() {
629                results.push(error_constructor(
630                    EntityType::Verifier,
631                    format!("Index {index} in config (0-based)"),
632                    "Verifier key was not able to be collected".to_string(),
633                ));
634            }
635        }
636    }
637
638    /// Retrieves the status of all entities (operators and verifiers) and restarts background tasks if needed.
639    /// Returns a vector of EntityStatusWithId. Only returns an error if restarting tasks fails when requested.
640    pub async fn get_entity_statuses(
641        &self,
642        restart_tasks: bool,
643    ) -> Result<Vec<EntityStatusWithId>, BridgeError> {
644        tracing::debug!("Getting entities status");
645
646        let operator_clients = self.get_operator_clients();
647        let verifier_clients = self.get_verifier_clients();
648        tracing::debug!("Operator clients: {:?}", operator_clients.len());
649
650        let (operator_keys, verifier_keys) = self.fetch_all_entity_keys().await;
651
652        // Query operators for status
653        let operator_status = join_all(
654            operator_clients
655                .iter()
656                .zip(operator_keys.iter())
657                .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
658                .map(|(client, key)| {
659                    let mut client = client.clone();
660                    let key = *key;
661                    async move {
662                        tracing::debug!("Getting operator status for {:?}", key);
663                        let mut request = Request::new(Empty {});
664                        request.set_timeout(ENTITY_STATUS_POLL_TIMEOUT);
665                        let response = client.get_current_status(request).await;
666
667                        EntityStatusWithId {
668                            entity_id: Some(RPCEntityId {
669                                kind: EntityType::Operator as i32,
670                                id: key.to_string(),
671                            }),
672                            status_result: match response {
673                                Ok(response) => Some(StatusResult::Status(response.into_inner())),
674                                Err(e) => Some(StatusResult::Err(clementine::EntityError {
675                                    error: e.to_string(),
676                                })),
677                            },
678                        }
679                    }
680                }),
681        )
682        .await;
683
684        // Query verifiers for status
685        let verifier_status = join_all(
686            verifier_clients
687                .iter()
688                .zip(verifier_keys.iter())
689                .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
690                .map(|(client, key)| {
691                    let mut client = client.clone();
692                    let key = *key;
693                    async move {
694                        tracing::debug!("Getting verifier status for {:?}", key);
695                        let mut request = Request::new(Empty {});
696                        request.set_timeout(ENTITY_STATUS_POLL_TIMEOUT);
697                        let response = client.get_current_status(request).await;
698
699                        EntityStatusWithId {
700                            entity_id: Some(RPCEntityId {
701                                kind: EntityType::Verifier as i32,
702                                id: key.to_string(),
703                            }),
704                            status_result: match response {
705                                Ok(response) => Some(StatusResult::Status(response.into_inner())),
706                                Err(e) => Some(StatusResult::Err(clementine::EntityError {
707                                    error: e.to_string(),
708                                })),
709                            },
710                        }
711                    }
712                }),
713        )
714        .await;
715
716        // Combine results
717        let mut entity_statuses = operator_status;
718        entity_statuses.extend(verifier_status);
719
720        // try to restart background tasks if requested, with a timeout
721        if restart_tasks {
722            let operator_tasks = operator_clients
723                .iter()
724                .zip(operator_keys.iter())
725                .filter_map(|(client, key)| key.map(|key| (client, key)))
726                .map(|(client, key)| {
727                    let mut client = client.clone();
728                    async move {
729                        let mut request = Request::new(Empty {});
730                        request.set_timeout(RESTART_BACKGROUND_TASKS_TIMEOUT);
731                        client
732                            .restart_background_tasks(Request::new(Empty {}))
733                            .await
734                            .wrap_err_with(|| {
735                                Status::internal(format!(
736                                    "Failed to restart background tasks for operator {key}"
737                                ))
738                            })
739                    }
740                });
741
742            let verifier_tasks = verifier_clients
743                .iter()
744                .zip(verifier_keys.iter())
745                .filter_map(|(client, key)| key.map(|key| (client, key)))
746                .map(|(client, key)| {
747                    let mut client = client.clone();
748                    async move {
749                        let mut request = Request::new(Empty {});
750                        request.set_timeout(RESTART_BACKGROUND_TASKS_TIMEOUT);
751                        client
752                            .restart_background_tasks(Request::new(Empty {}))
753                            .await
754                            .wrap_err_with(|| {
755                                Status::internal(format!(
756                                    "Failed to restart background tasks for verifier {key}"
757                                ))
758                            })
759                    }
760                });
761
762            let (operator_results, verifier_results) = futures::join!(
763                futures::future::join_all(operator_tasks),
764                futures::future::join_all(verifier_tasks)
765            );
766            // Log any errors from restart_background_tasks rpc call
767            for result in operator_results
768                .into_iter()
769                .chain(verifier_results.into_iter())
770            {
771                if let Err(e) = result {
772                    tracing::error!("{e:?}");
773                }
774            }
775        }
776
777        // Add error entries for unreachable entities
778        Self::add_unreachable_entity_errors(
779            &mut entity_statuses,
780            &operator_keys,
781            &verifier_keys,
782            |entity_type, id, error_msg| EntityStatusWithId {
783                entity_id: Some(RPCEntityId {
784                    kind: entity_type as i32,
785                    id,
786                }),
787                status_result: Some(StatusResult::Err(clementine::EntityError {
788                    error: error_msg,
789                })),
790            },
791        );
792
793        Ok(entity_statuses)
794    }
795
796    pub async fn get_compatibility_data_from_entities(
797        &self,
798    ) -> Result<Vec<EntityDataWithId>, BridgeError> {
799        let operator_clients = self.get_operator_clients();
800        let verifier_clients = self.get_verifier_clients();
801
802        let (operator_keys, verifier_keys) = self.fetch_all_entity_keys().await;
803
804        // Query operators for compatibility data
805        let operator_comp_data = join_all(
806            operator_clients
807                .iter()
808                .zip(operator_keys.iter())
809                .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
810                .map(|(client, key)| {
811                    let mut client = client.clone();
812                    let key = *key;
813                    async move {
814                        tracing::debug!("Getting operator compatibility data for {:?}", key);
815                        let mut request = Request::new(Empty {});
816                        request.set_timeout(ENTITY_COMP_DATA_POLL_TIMEOUT);
817                        let response = client.get_compatibility_params(request).await;
818
819                        EntityDataWithId {
820                            entity_id: Some(RPCEntityId {
821                                kind: EntityType::Operator as i32,
822                                id: key.to_string(),
823                            }),
824                            data_result: match response {
825                                Ok(response) => Some(DataResult::Data(response.into_inner())),
826                                Err(e) => Some(DataResult::Error(e.to_string())),
827                            },
828                        }
829                    }
830                }),
831        )
832        .await;
833
834        // Query verifiers for compatibility data
835        let verifier_comp_data = join_all(
836            verifier_clients
837                .iter()
838                .zip(verifier_keys.iter())
839                .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
840                .map(|(client, key)| {
841                    let mut client = client.clone();
842                    let key = *key;
843                    async move {
844                        tracing::debug!("Getting verifier compatibility data for {:?}", key);
845                        let mut request = Request::new(Empty {});
846                        request.set_timeout(ENTITY_COMP_DATA_POLL_TIMEOUT);
847                        let response = client.get_compatibility_params(request).await;
848
849                        EntityDataWithId {
850                            entity_id: Some(RPCEntityId {
851                                kind: EntityType::Verifier as i32,
852                                id: key.to_string(),
853                            }),
854                            data_result: match response {
855                                Ok(response) => Some(DataResult::Data(response.into_inner())),
856                                Err(e) => Some(DataResult::Error(e.to_string())),
857                            },
858                        }
859                    }
860                }),
861        )
862        .await;
863
864        // Combine results
865        let mut entities_comp_data = operator_comp_data;
866        entities_comp_data.extend(verifier_comp_data);
867
868        // add aggregators own data
869        let aggregator_comp_data = EntityDataWithId {
870            entity_id: Some(RPCEntityId {
871                kind: EntityType::Aggregator as i32,
872                id: "Aggregator".to_string(),
873            }),
874            data_result: {
875                let compatibility_params: Result<CompatibilityParamsRpc, eyre::Report> =
876                    self.get_compatibility_params()?.try_into();
877                match compatibility_params {
878                    Ok(compatibility_params) => Some(DataResult::Data(compatibility_params)),
879                    Err(e) => Some(DataResult::Error(e.to_string())),
880                }
881            },
882        };
883
884        entities_comp_data.push(aggregator_comp_data);
885
886        // Add error entries for unreachable entities
887        Self::add_unreachable_entity_errors(
888            &mut entities_comp_data,
889            &operator_keys,
890            &verifier_keys,
891            |entity_type, id, error_msg| EntityDataWithId {
892                entity_id: Some(RPCEntityId {
893                    kind: entity_type as i32,
894                    id,
895                }),
896                data_result: Some(DataResult::Error(error_msg)),
897            },
898        );
899
900        Ok(entities_comp_data)
901    }
902
903    /// Checks compatibility with other actors.
904    /// Returns an error if aggregator is not compatible with any of the other actors, or any other actor returns an error.
905    pub async fn check_compatibility_with_actors(
906        &self,
907        verifiers_included: bool,
908        operators_included: bool,
909    ) -> Result<(), BridgeError> {
910        let mut other_errors = Vec::new();
911        let mut actors_compat_params = Vec::new();
912
913        if operators_included {
914            let operator_keys = self.fetch_operator_keys().await?;
915            for (operator_id, operator_client) in operator_keys
916                .into_iter()
917                .map(OperatorId)
918                .zip(self.operator_clients.iter())
919            {
920                let mut operator_client = operator_client.clone();
921
922                let res = {
923                    let compatibility_params: CompatibilityParams = operator_client
924                        .get_compatibility_params(Empty {})
925                        .await?
926                        .into_inner()
927                        .try_into()?;
928                    actors_compat_params.push((operator_id.to_string(), compatibility_params));
929                    Ok::<_, BridgeError>(())
930                };
931                if let Err(e) = res {
932                    other_errors.push(format!(
933                        "{operator_id} error while retrieving compatibility params: {e}",
934                    ));
935                }
936            }
937        }
938
939        if verifiers_included {
940            let verifier_keys = self.fetch_verifier_keys().await?;
941            for (verifier_id, verifier_client) in verifier_keys
942                .into_iter()
943                .map(VerifierId)
944                .zip(self.verifier_clients.iter())
945            {
946                let mut verifier_client = verifier_client.clone();
947
948                let res = {
949                    let compatibility_params: CompatibilityParams = verifier_client
950                        .get_compatibility_params(Empty {})
951                        .await?
952                        .into_inner()
953                        .try_into()?;
954                    actors_compat_params.push((verifier_id.to_string(), compatibility_params));
955                    Ok::<_, BridgeError>(())
956                };
957                if let Err(e) = res {
958                    other_errors.push(format!(
959                        "{verifier_id} error while retrieving compatibility params: {e}",
960                    ));
961                }
962            }
963        }
964
965        // Combine compatibility error and other errors (ex: connection) into a single message
966        if let Err(e) = self.is_compatible(actors_compat_params) {
967            other_errors.push(format!("Clementine not compatible with some actors: {e}"));
968        }
969        if !other_errors.is_empty() {
970            return Err(eyre::eyre!(other_errors.join("; ")).into());
971        }
972
973        Ok(())
974    }
975}
976
977/// Aggregator server wrapper that manages background tasks.
978#[derive(Debug)]
979pub struct AggregatorServer {
980    pub aggregator: Aggregator,
981    background_tasks: crate::task::manager::BackgroundTaskManager,
982}
983
984impl AggregatorServer {
985    pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
986        let aggregator = Aggregator::new(config.clone()).await?;
987        let background_tasks = crate::task::manager::BackgroundTaskManager::default();
988
989        Ok(Self {
990            aggregator,
991            background_tasks,
992        })
993    }
994
995    /// Starts the background tasks for the aggregator.
996    /// If called multiple times, it will restart only the tasks that are not already running.
997    pub async fn start_background_tasks(&self) -> Result<(), BridgeError> {
998        // Start the aggregator metric publisher task
999        self.background_tasks
1000            .ensure_task_looping(
1001                crate::task::aggregator_metric_publisher::AggregatorMetricPublisher::new(
1002                    self.aggregator.clone(),
1003                )
1004                .await?
1005                .with_delay(AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY),
1006            )
1007            .await;
1008
1009        tracing::info!("Aggregator metric publisher task started");
1010
1011        Ok(())
1012    }
1013
1014    pub async fn shutdown(&mut self) {
1015        self.background_tasks.graceful_shutdown().await;
1016    }
1017}
1018
1019impl Deref for AggregatorServer {
1020    type Target = Aggregator;
1021
1022    fn deref(&self) -> &Self::Target {
1023        &self.aggregator
1024    }
1025}