clementine_core/
aggregator.rs

1use std::collections::HashSet;
2use std::ops::Deref;
3use std::sync::Arc;
4
5use crate::compatibility::{ActorWithConfig, CompatibilityParams};
6use crate::constants::{
7    ENTITY_COMP_DATA_POLL_TIMEOUT, ENTITY_STATUS_POLL_TIMEOUT, OPERATOR_GET_KEYS_TIMEOUT,
8    PUBLIC_KEY_COLLECTION_TIMEOUT, RESTART_BACKGROUND_TASKS_TIMEOUT, VERIFIER_SEND_KEYS_TIMEOUT,
9};
10use crate::deposit::DepositData;
11use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
12use crate::rpc::clementine::entity_data_with_id::DataResult;
13use crate::rpc::clementine::entity_status_with_id::StatusResult;
14use crate::rpc::clementine::{
15    self, CompatibilityParamsRpc, DepositParams, Empty, EntityStatusWithId, EntityType,
16    OperatorKeysWithDeposit,
17};
18use crate::rpc::clementine::{EntityDataWithId, EntityId as RPCEntityId};
19use crate::task::aggregator_metric_publisher::AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY;
20use crate::task::TaskExt;
21#[cfg(feature = "automation")]
22use crate::tx_sender::TxSenderClient;
23use crate::utils::{
24    flatten_join_named_results, join_all_partition_results, timed_request, timed_try_join_all,
25};
26use crate::{
27    config::BridgeConfig,
28    database::Database,
29    rpc::{
30        self,
31        clementine::{
32            clementine_operator_client::ClementineOperatorClient,
33            clementine_verifier_client::ClementineVerifierClient,
34        },
35    },
36};
37use bitcoin::secp256k1::PublicKey;
38use bitcoin::XOnlyPublicKey;
39use clementine_errors::BridgeError;
40use eyre::Context;
41use futures::future::join_all;
42use std::future::Future;
43use std::hash::Hash as StdHash;
44use tokio::sync::RwLock;
45use tonic::{Request, Status};
46use tracing::{debug_span, Instrument};
47
48/// Aggregator struct.
49/// This struct is responsible for aggregating partial signatures from the verifiers.
50/// It will have in total 3 * num_operator + 1 aggregated nonces.
51/// \[0\] -> Aggregated nonce for the move transaction.
52/// [1..num_operator + 1] -> Aggregated nonces for the operator_takes transactions.
53/// [num_operator + 1..2 * num_operator + 1] -> Aggregated nonces for the slash_or_take transactions.
54/// [2 * num_operator + 1..3 * num_operator + 1] -> Aggregated nonces for the burn transactions.
55/// For now, we do not have the last bit.
56#[derive(Debug, Clone)]
57pub struct Aggregator {
58    pub(crate) rpc: ExtendedBitcoinRpc,
59    pub(crate) db: Database,
60    pub(crate) config: BridgeConfig,
61    #[cfg(feature = "automation")]
62    pub(crate) tx_sender: TxSenderClient,
63    operator_clients: Vec<ClementineOperatorClient<tonic::transport::Channel>>,
64    verifier_clients: Vec<ClementineVerifierClient<tonic::transport::Channel>>,
65    verifier_keys: Arc<RwLock<Vec<Option<PublicKey>>>>,
66    operator_keys: Arc<RwLock<Vec<Option<XOnlyPublicKey>>>>,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
70pub enum EntityId {
71    Verifier(VerifierId),
72    Operator(OperatorId),
73    Aggregator,
74}
75
76/// Specifies which entity types to include when checking compatibility.
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum CompatibilityCheckScope {
79    /// Check compatibility with verifiers only.
80    VerifiersOnly,
81    /// Check compatibility with operators only.
82    OperatorsOnly,
83    /// Check compatibility with both verifiers and operators.
84    Both,
85}
86
87/// Wrapper struct that renders the verifier id in the logs.
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
89pub struct VerifierId(pub PublicKey);
90
91/// Wrapper struct that renders the operator id in the logs.
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
93pub struct OperatorId(pub XOnlyPublicKey);
94
95impl std::fmt::Display for EntityId {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        match self {
98            EntityId::Aggregator => write!(f, "Aggregator"),
99            EntityId::Verifier(id) => write!(f, "{id}"),
100            EntityId::Operator(id) => write!(f, "{id}"),
101        }
102    }
103}
104
105impl std::fmt::Display for VerifierId {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        write!(f, "Verifier({})", &self.0.to_string()[..10])
108    }
109}
110
111impl std::fmt::Display for OperatorId {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "Operator({})", &self.0.to_string()[..10])
114    }
115}
116
117/// Wrapper struct that matches verifier clients with their ids.
118#[derive(Debug, Clone)]
119pub struct ParticipatingVerifiers(
120    pub  Vec<(
121        ClementineVerifierClient<tonic::transport::Channel>,
122        VerifierId,
123    )>,
124);
125
126impl ParticipatingVerifiers {
127    pub fn new(
128        verifiers: Vec<(
129            ClementineVerifierClient<tonic::transport::Channel>,
130            VerifierId,
131        )>,
132    ) -> Self {
133        Self(verifiers)
134    }
135
136    pub fn clients(&self) -> Vec<ClementineVerifierClient<tonic::transport::Channel>> {
137        self.0.iter().map(|(client, _)| client.clone()).collect()
138    }
139
140    pub fn ids(&self) -> Vec<VerifierId> {
141        self.0.iter().map(|(_, id)| *id).collect()
142    }
143}
144
145/// Wrapper struct that matches operator clients with their ids.
146#[derive(Debug, Clone)]
147pub struct ParticipatingOperators(
148    pub  Vec<(
149        ClementineOperatorClient<tonic::transport::Channel>,
150        OperatorId,
151    )>,
152);
153
154impl ParticipatingOperators {
155    pub fn new(
156        operators: Vec<(
157            ClementineOperatorClient<tonic::transport::Channel>,
158            OperatorId,
159        )>,
160    ) -> Self {
161        Self(operators)
162    }
163
164    pub fn clients(&self) -> Vec<ClementineOperatorClient<tonic::transport::Channel>> {
165        self.0.iter().map(|(client, _)| client.clone()).collect()
166    }
167
168    pub fn ids(&self) -> Vec<OperatorId> {
169        self.0.iter().map(|(_, id)| *id).collect()
170    }
171}
172
173impl Aggregator {
174    pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
175        let db = Database::new(&config).await?;
176
177        let rpc = ExtendedBitcoinRpc::connect(
178            config.bitcoin_rpc_url.clone(),
179            config.bitcoin_rpc_user.clone(),
180            config.bitcoin_rpc_password.clone(),
181            None,
182        )
183        .await?;
184
185        let verifier_endpoints =
186            config
187                .verifier_endpoints
188                .clone()
189                .ok_or(BridgeError::ConfigError(
190                    "No verifier endpoints provided in config".into(),
191                ))?;
192
193        let operator_endpoints =
194            config
195                .operator_endpoints
196                .clone()
197                .ok_or(BridgeError::ConfigError(
198                    "No operator endpoints provided in config".into(),
199                ))?;
200
201        // Create clients to connect to all verifiers
202        let verifier_clients = rpc::get_clients(
203            verifier_endpoints,
204            crate::rpc::verifier_client_builder(&config),
205            &config,
206            true,
207        )
208        .await?;
209
210        // Create clients to connect to all operators
211        let operator_clients = rpc::get_clients(
212            operator_endpoints,
213            crate::rpc::operator_client_builder(&config),
214            &config,
215            true,
216        )
217        .await?;
218
219        #[cfg(feature = "automation")]
220        let tx_sender =
221            TxSenderClient::new(clementine_tx_sender::TxSenderDb::from_pool(db.get_pool()));
222
223        tracing::info!(
224            "Aggregator created with {} verifiers and {} operators",
225            verifier_clients.len(),
226            operator_clients.len(),
227        );
228
229        let operator_keys = Arc::new(RwLock::new(vec![None; operator_clients.len()]));
230        let verifier_keys = Arc::new(RwLock::new(vec![None; verifier_clients.len()]));
231
232        Ok(Aggregator {
233            rpc,
234            db,
235            config,
236            #[cfg(feature = "automation")]
237            tx_sender,
238            verifier_clients,
239            operator_clients,
240            verifier_keys,
241            operator_keys,
242        })
243    }
244
245    pub fn get_verifier_clients(&self) -> &[ClementineVerifierClient<tonic::transport::Channel>] {
246        &self.verifier_clients
247    }
248
249    /// Generic helper function to fetch keys from clients
250    async fn fetch_pubkeys_from_entities<T, C, F, Fut>(
251        &self,
252        clients: &[C],
253        keys_storage: &RwLock<Vec<Option<T>>>,
254        pubkey_fetcher: F,
255        key_type_name: &str,
256    ) -> Result<Vec<T>, BridgeError>
257    where
258        T: Clone + Send + Sync + Eq + StdHash + std::fmt::Debug,
259        C: Clone + Send + Sync,
260        F: Fn(C) -> Fut + Send + Sync,
261        Fut: Future<Output = Result<T, BridgeError>> + Send,
262    {
263        // Check if all keys are collected
264        let all_collected = {
265            let keys = keys_storage.read().await;
266            keys.iter().all(|key| key.is_some())
267        };
268
269        if !all_collected {
270            // get a write lock early, so that only one thread can try to collect keys
271            let mut keys = keys_storage.write().await;
272
273            // sanity check because we directly use indexes below
274            if keys.len() != clients.len() {
275                return Err(eyre::eyre!(
276                    "Keys storage length does not match clients length, should not happen, keys length: {}, clients length: {}",
277                    keys.len(),
278                    clients.len()
279                )
280                .into());
281            }
282
283            let key_collection_futures = clients
284                .iter()
285                .zip(keys.iter().enumerate())
286                .filter_map(|(client, (idx, key))| {
287                    if key.is_none() {
288                        Some((idx, pubkey_fetcher(client.clone())))
289                    } else {
290                        None
291                    }
292                })
293                .map(|(idx, fut)| async move { (idx, fut.await) });
294
295            let collected_keys = join_all(key_collection_futures).await;
296            let mut missing_keys = Vec::new();
297
298            // Fill in keys with the results of the futures
299            for (idx, new_key) in collected_keys {
300                match new_key {
301                    Ok(new_key) => keys[idx] = Some(new_key),
302                    Err(e) => {
303                        tracing::debug!(
304                            "Failed to collect {} {} (order in config) key: {}",
305                            key_type_name,
306                            idx,
307                            e
308                        );
309                        missing_keys.push(idx);
310                    }
311                }
312            }
313
314            // if keys are not unique, return an error if so
315            let non_none_keys: Vec<_> = keys.iter().filter_map(|key| key.as_ref()).collect();
316            let unique_keys: HashSet<_> = non_none_keys.iter().cloned().collect();
317
318            if unique_keys.len() != non_none_keys.len() {
319                let reason = format!("{key_type_name} keys are not unique: {keys:?}");
320                // reset all keys to None so that faulty keys are not used
321                for key in keys.iter_mut() {
322                    *key = None;
323                }
324                return Err(eyre::eyre!(reason).into());
325            }
326
327            // if not all keys were collected, return an error
328            if keys.iter().any(|key| key.is_none()) {
329                return Err(eyre::eyre!(
330                    "Not all {} keys were able to be collected, missing keys at indices: {:?}",
331                    key_type_name,
332                    missing_keys
333                )
334                .into());
335            }
336        }
337
338        // return all keys if they were all collected
339        Ok(keys_storage
340            .read()
341            .await
342            .iter()
343            .map(|key| key.clone().expect("should all be collected"))
344            .collect())
345    }
346
347    /// If all verifier keys are already collected, returns them.
348    /// Otherwise, it tries to collect them from the verifiers, saves them and returns them.
349    pub async fn fetch_verifier_keys(&self) -> Result<Vec<PublicKey>, BridgeError> {
350        self.fetch_pubkeys_from_entities(
351            &self.verifier_clients,
352            &self.verifier_keys,
353            |mut client| async move {
354                let mut request = Request::new(Empty {});
355                request.set_timeout(PUBLIC_KEY_COLLECTION_TIMEOUT);
356                let verifier_params = client.get_params(request).await?.into_inner();
357                let public_key = PublicKey::from_slice(&verifier_params.public_key)
358                    .map_err(|e| eyre::eyre!("Failed to parse verifier public key: {}", e))?;
359                Ok::<_, BridgeError>(public_key)
360            },
361            "verifier",
362        )
363        .await
364    }
365
366    /// If all operator keys are already collected, returns them.
367    /// Otherwise, it tries to collect them from the operators, saves them and returns them.
368    pub async fn fetch_operator_keys(&self) -> Result<Vec<XOnlyPublicKey>, BridgeError> {
369        self.fetch_pubkeys_from_entities(
370            &self.operator_clients,
371            &self.operator_keys,
372            |mut client| async move {
373                let mut request = Request::new(Empty {});
374                request.set_timeout(PUBLIC_KEY_COLLECTION_TIMEOUT);
375                let operator_xonly_pk: XOnlyPublicKey = client
376                    .get_x_only_public_key(request)
377                    .await?
378                    .into_inner()
379                    .try_into()?;
380                Ok::<_, BridgeError>(operator_xonly_pk)
381            },
382            "operator",
383        )
384        .await
385    }
386
387    pub fn get_operator_clients(&self) -> &[ClementineOperatorClient<tonic::transport::Channel>] {
388        &self.operator_clients
389    }
390
391    /// Collects and distributes keys to verifiers from operators and watchtowers for the new deposit
392    /// for operators: get bitvm assert winternitz public keys and watchtower challenge ack hashes
393    /// for watchtowers: get winternitz public keys for watchtower challenges
394    pub async fn collect_and_distribute_keys(
395        &self,
396        deposit_params: &DepositParams,
397    ) -> Result<(), BridgeError> {
398        tracing::info!("Starting collect_and_distribute_keys");
399
400        let start_time = std::time::Instant::now();
401
402        let deposit_data: DepositData = deposit_params.clone().try_into()?;
403
404        // Create channels with larger capacity to prevent blocking
405        let (operator_keys_tx, operator_keys_rx) =
406            tokio::sync::broadcast::channel::<clementine::OperatorKeysWithDeposit>(
407                deposit_data.get_num_operators() * deposit_data.get_num_verifiers(),
408            );
409        let operator_rx_handles = (0..deposit_data.get_num_verifiers())
410            .map(|_| operator_keys_rx.resubscribe())
411            .collect::<Vec<_>>();
412
413        let operators = self.get_participating_operators(&deposit_data).await?;
414        let operator_clients = operators.clients();
415
416        let operator_xonly_pks = deposit_data.get_operators();
417        let deposit = deposit_params.clone();
418
419        tracing::info!("Starting operator key collection");
420        #[cfg(test)]
421        let timeout_params = self.config.test_params.timeout_params;
422        #[allow(clippy::unused_enumerate_index)]
423        let get_operators_keys_handle = tokio::spawn(timed_try_join_all(
424            OPERATOR_GET_KEYS_TIMEOUT,
425            "Operator key collection",
426            Some(operators.ids()),
427            operator_clients
428                .into_iter()
429                .zip(operator_xonly_pks.into_iter())
430                .enumerate()
431                .map(move |(_idx, (mut operator_client, operator_xonly_pk))| {
432                    let deposit_params = deposit.clone();
433                    let tx = operator_keys_tx.clone();
434                    async move {
435                        #[cfg(test)]
436                        timeout_params
437                            .hook_timeout_key_collection_operator(_idx)
438                            .await;
439
440                        let operator_keys = operator_client
441                            .get_deposit_keys(deposit_params.clone())
442                            .instrument(
443                                debug_span!("get_deposit_keys", id=%OperatorId(operator_xonly_pk)),
444                            )
445                            .await
446                            .wrap_err(Status::internal(format!(
447                                "Operator {operator_xonly_pk} key retrieval failed"
448                            )))?
449                            .into_inner();
450
451                        // A send error means that all receivers are closed,
452                        // receivers only close if they have an error (while
453                        // loop condition)
454                        // We don't care about the result of the send, we
455                        // only care about the error on the other side.
456                        // Ignore this error, and let the other side's error
457                        // propagate.
458                        let _ = tx.send(OperatorKeysWithDeposit {
459                            deposit_params: Some(deposit_params),
460                            operator_keys: Some(operator_keys),
461                            operator_xonly_pk: operator_xonly_pk.serialize().to_vec(),
462                        });
463
464                        Ok(())
465                    }
466                }),
467        ));
468
469        tracing::info!("Starting operator key distribution to verifiers");
470        let verifiers = self.get_participating_verifiers(&deposit_data).await?;
471
472        let verifier_clients = verifiers.clients();
473        let num_operators = deposit_data.get_num_operators();
474
475        let verifier_ids = verifiers.ids();
476
477        #[cfg(test)]
478        let timeout_params = self.config.test_params.timeout_params;
479        #[allow(clippy::unused_enumerate_index)]
480        let distribute_operators_keys_handle = tokio::spawn(timed_try_join_all(
481            VERIFIER_SEND_KEYS_TIMEOUT,
482            "Verifier key distribution",
483            Some(verifier_ids.clone()),
484            verifier_clients
485                .into_iter()
486                .zip(operator_rx_handles)
487                .zip(verifier_ids)
488                .enumerate()
489                .map(
490                    move |(_idx, ((mut verifier, mut rx), verifier_id))| async move {
491                        #[cfg(test)]
492                        timeout_params
493                            .hook_timeout_key_distribution_verifier(_idx)
494                            .await;
495
496                        // Only wait for expected number of messages
497                        let mut received_keys = std::collections::HashSet::new();
498                        while received_keys.len() < num_operators {
499                            tracing::debug!(
500                                "Waiting for operator key (received {}/{})",
501                                received_keys.len(),
502                                num_operators
503                            );
504
505                            // This will not block forever because of the timeout on the join all.
506                            let operator_keys = rx
507                            .recv()
508                            .instrument(debug_span!("operator_keys_recv"))
509                            .await
510                            .wrap_err(Status::internal(
511                                "Operator broadcast channels closed before all keys were received",
512                            ))?;
513
514                            let operator_xonly_pk = operator_keys.operator_xonly_pk.clone();
515
516                            if !received_keys.insert(operator_xonly_pk.clone()) {
517                                continue;
518                            }
519
520                            timed_request(
521                                VERIFIER_SEND_KEYS_TIMEOUT,
522                                &format!("Setting operator keys for {verifier_id}"),
523                                async {
524                                    Ok(verifier
525                                        .set_operator_keys(operator_keys)
526                                        .await
527                                        .wrap_err_with(|| {
528                                            Status::internal(format!(
529                                                "Failed to set operator keys for {verifier_id}",
530                                            ))
531                                        }))
532                                },
533                            )
534                            .await??;
535                        }
536                        Ok::<_, BridgeError>(())
537                    },
538                ),
539        ));
540
541        // Wait for all tasks to complete
542        let (get_operators_keys_result, distribute_operators_keys_result) =
543            tokio::join!(get_operators_keys_handle, distribute_operators_keys_handle);
544
545        flatten_join_named_results([
546            ("get_operators_keys", get_operators_keys_result),
547            (
548                "distribute_operators_keys",
549                distribute_operators_keys_result,
550            ),
551        ])?;
552
553        tracing::info!(
554            "collect_and_distribute_keys completed in {:?}",
555            start_time.elapsed()
556        );
557
558        Ok(())
559    }
560
561    /// Returns a list of verifier clients that are participating in the deposit.
562    pub async fn get_participating_verifiers(
563        &self,
564        deposit_data: &DepositData,
565    ) -> Result<ParticipatingVerifiers, BridgeError> {
566        let verifier_keys = self.fetch_verifier_keys().await?;
567        let mut participating_verifiers = Vec::new();
568
569        let verifiers = deposit_data.get_verifiers();
570
571        for verifier_pk in verifiers {
572            if let Some(pos) = verifier_keys.iter().position(|key| key == &verifier_pk) {
573                participating_verifiers
574                    .push((self.verifier_clients[pos].clone(), VerifierId(verifier_pk)));
575            } else {
576                tracing::error!(
577                    "Verifier public key not found. Deposit data verifier keys: {:?}, self verifier keys: {:?}",
578                    deposit_data.get_verifiers(),
579                    self.verifier_keys
580                );
581                return Err(BridgeError::VerifierNotFound(verifier_pk));
582            }
583        }
584
585        Ok(ParticipatingVerifiers::new(participating_verifiers))
586    }
587
588    /// Returns a list of operator clients that are participating in the deposit.
589    pub async fn get_participating_operators(
590        &self,
591        deposit_data: &DepositData,
592    ) -> Result<ParticipatingOperators, BridgeError> {
593        let operator_keys = self.fetch_operator_keys().await?;
594        let mut participating_operators = Vec::new();
595
596        let operators = deposit_data.get_operators();
597
598        for operator_pk in operators {
599            if let Some(pos) = operator_keys.iter().position(|key| key == &operator_pk) {
600                participating_operators
601                    .push((self.operator_clients[pos].clone(), OperatorId(operator_pk)));
602            } else {
603                return Err(BridgeError::OperatorNotFound(operator_pk));
604            }
605        }
606
607        Ok(ParticipatingOperators::new(participating_operators))
608    }
609
610    /// Helper function to fetch keys for both operators and verifiers.
611    /// Returns (operator_keys, verifier_keys) without failing if some entities are unreachable.
612    async fn fetch_all_entity_keys(&self) -> (Vec<Option<XOnlyPublicKey>>, Vec<Option<PublicKey>>) {
613        // Try to reach all operators and verifiers to collect keys, but do not return err if some can't be reached
614        let _ = self.fetch_operator_keys().await;
615        let _ = self.fetch_verifier_keys().await;
616
617        let operator_keys = self.operator_keys.read().await.clone();
618        let verifier_keys = self.verifier_keys.read().await.clone();
619
620        (operator_keys, verifier_keys)
621    }
622
623    /// Helper function to add error entries for entities where keys couldn't be collected.
624    fn add_unreachable_entity_errors<T, F>(
625        results: &mut Vec<T>,
626        operator_keys: &[Option<XOnlyPublicKey>],
627        verifier_keys: &[Option<PublicKey>],
628        error_constructor: F,
629    ) where
630        F: Fn(EntityType, String, String) -> T,
631    {
632        for (index, key) in operator_keys.iter().enumerate() {
633            if key.is_none() {
634                results.push(error_constructor(
635                    EntityType::Operator,
636                    format!("Index {index} in config (0-based)"),
637                    "Operator key was not able to be collected".to_string(),
638                ));
639            }
640        }
641        for (index, key) in verifier_keys.iter().enumerate() {
642            if key.is_none() {
643                results.push(error_constructor(
644                    EntityType::Verifier,
645                    format!("Index {index} in config (0-based)"),
646                    "Verifier key was not able to be collected".to_string(),
647                ));
648            }
649        }
650    }
651
652    /// Retrieves the status of all entities (operators and verifiers) and restarts background tasks if needed.
653    /// Returns a vector of EntityStatusWithId. Only returns an error if restarting tasks fails when requested.
654    pub async fn get_entity_statuses(
655        &self,
656        restart_tasks: bool,
657    ) -> Result<Vec<EntityStatusWithId>, BridgeError> {
658        tracing::debug!("Getting entities status");
659
660        let operator_clients = self.get_operator_clients();
661        let verifier_clients = self.get_verifier_clients();
662        tracing::debug!("Operator clients: {:?}", operator_clients.len());
663
664        let (operator_keys, verifier_keys) = self.fetch_all_entity_keys().await;
665
666        // Query operators for status
667        let operator_status = join_all(
668            operator_clients
669                .iter()
670                .zip(operator_keys.iter())
671                .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
672                .map(|(client, key)| {
673                    let mut client = client.clone();
674                    let key = *key;
675                    async move {
676                        tracing::debug!("Getting operator status for {:?}", key);
677                        let mut request = Request::new(Empty {});
678                        request.set_timeout(ENTITY_STATUS_POLL_TIMEOUT);
679                        let response = client.get_current_status(request).await;
680
681                        EntityStatusWithId {
682                            entity_id: Some(RPCEntityId {
683                                kind: EntityType::Operator as i32,
684                                id: key.to_string(),
685                            }),
686                            status_result: match response {
687                                Ok(response) => Some(StatusResult::Status(response.into_inner())),
688                                Err(e) => Some(StatusResult::Err(clementine::EntityError {
689                                    error: e.to_string(),
690                                })),
691                            },
692                        }
693                    }
694                }),
695        )
696        .await;
697
698        // Query verifiers for status
699        let verifier_status = join_all(
700            verifier_clients
701                .iter()
702                .zip(verifier_keys.iter())
703                .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
704                .map(|(client, key)| {
705                    let mut client = client.clone();
706                    let key = *key;
707                    async move {
708                        tracing::debug!("Getting verifier status for {:?}", key);
709                        let mut request = Request::new(Empty {});
710                        request.set_timeout(ENTITY_STATUS_POLL_TIMEOUT);
711                        let response = client.get_current_status(request).await;
712
713                        EntityStatusWithId {
714                            entity_id: Some(RPCEntityId {
715                                kind: EntityType::Verifier as i32,
716                                id: key.to_string(),
717                            }),
718                            status_result: match response {
719                                Ok(response) => Some(StatusResult::Status(response.into_inner())),
720                                Err(e) => Some(StatusResult::Err(clementine::EntityError {
721                                    error: e.to_string(),
722                                })),
723                            },
724                        }
725                    }
726                }),
727        )
728        .await;
729
730        // Combine results
731        let mut entity_statuses = operator_status;
732        entity_statuses.extend(verifier_status);
733
734        // try to restart background tasks if requested, with a timeout
735        if restart_tasks {
736            let operator_tasks = operator_clients
737                .iter()
738                .zip(operator_keys.iter())
739                .filter_map(|(client, key)| key.map(|key| (client, key)))
740                .map(|(client, key)| {
741                    let mut client = client.clone();
742                    async move {
743                        let mut request = Request::new(Empty {});
744                        request.set_timeout(RESTART_BACKGROUND_TASKS_TIMEOUT);
745                        client
746                            .restart_background_tasks(Request::new(Empty {}))
747                            .await
748                            .wrap_err_with(|| {
749                                Status::internal(format!(
750                                    "Failed to restart background tasks for operator {key}"
751                                ))
752                            })
753                    }
754                });
755
756            let verifier_tasks = verifier_clients
757                .iter()
758                .zip(verifier_keys.iter())
759                .filter_map(|(client, key)| key.map(|key| (client, key)))
760                .map(|(client, key)| {
761                    let mut client = client.clone();
762                    async move {
763                        let mut request = Request::new(Empty {});
764                        request.set_timeout(RESTART_BACKGROUND_TASKS_TIMEOUT);
765                        client
766                            .restart_background_tasks(Request::new(Empty {}))
767                            .await
768                            .wrap_err_with(|| {
769                                Status::internal(format!(
770                                    "Failed to restart background tasks for verifier {key}"
771                                ))
772                            })
773                    }
774                });
775
776            let (operator_results, verifier_results) = futures::join!(
777                futures::future::join_all(operator_tasks),
778                futures::future::join_all(verifier_tasks)
779            );
780            // Log any errors from restart_background_tasks rpc call
781            for result in operator_results
782                .into_iter()
783                .chain(verifier_results.into_iter())
784            {
785                if let Err(e) = result {
786                    tracing::error!("{e:?}");
787                }
788            }
789        }
790
791        // Add error entries for unreachable entities
792        Self::add_unreachable_entity_errors(
793            &mut entity_statuses,
794            &operator_keys,
795            &verifier_keys,
796            |entity_type, id, error_msg| EntityStatusWithId {
797                entity_id: Some(RPCEntityId {
798                    kind: entity_type as i32,
799                    id,
800                }),
801                status_result: Some(StatusResult::Err(clementine::EntityError {
802                    error: error_msg,
803                })),
804            },
805        );
806
807        Ok(entity_statuses)
808    }
809
810    pub async fn get_compatibility_data_from_entities(
811        &self,
812    ) -> Result<Vec<EntityDataWithId>, BridgeError> {
813        let operator_clients = self.get_operator_clients();
814        let verifier_clients = self.get_verifier_clients();
815
816        let (operator_keys, verifier_keys) = self.fetch_all_entity_keys().await;
817
818        // Query operators for compatibility data
819        let operator_comp_data = join_all(
820            operator_clients
821                .iter()
822                .zip(operator_keys.iter())
823                .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
824                .map(|(client, key)| {
825                    let mut client = client.clone();
826                    let key = *key;
827                    async move {
828                        tracing::debug!("Getting operator compatibility data for {:?}", key);
829                        let mut request = Request::new(Empty {});
830                        request.set_timeout(ENTITY_COMP_DATA_POLL_TIMEOUT);
831                        let response = client.get_compatibility_params(request).await;
832
833                        EntityDataWithId {
834                            entity_id: Some(RPCEntityId {
835                                kind: EntityType::Operator as i32,
836                                id: key.to_string(),
837                            }),
838                            data_result: match response {
839                                Ok(response) => Some(DataResult::Data(response.into_inner())),
840                                Err(e) => Some(DataResult::Error(e.to_string())),
841                            },
842                        }
843                    }
844                }),
845        )
846        .await;
847
848        // Query verifiers for compatibility data
849        let verifier_comp_data = join_all(
850            verifier_clients
851                .iter()
852                .zip(verifier_keys.iter())
853                .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
854                .map(|(client, key)| {
855                    let mut client = client.clone();
856                    let key = *key;
857                    async move {
858                        tracing::debug!("Getting verifier compatibility data for {:?}", key);
859                        let mut request = Request::new(Empty {});
860                        request.set_timeout(ENTITY_COMP_DATA_POLL_TIMEOUT);
861                        let response = client.get_compatibility_params(request).await;
862
863                        EntityDataWithId {
864                            entity_id: Some(RPCEntityId {
865                                kind: EntityType::Verifier as i32,
866                                id: key.to_string(),
867                            }),
868                            data_result: match response {
869                                Ok(response) => Some(DataResult::Data(response.into_inner())),
870                                Err(e) => Some(DataResult::Error(e.to_string())),
871                            },
872                        }
873                    }
874                }),
875        )
876        .await;
877
878        // Combine results
879        let mut entities_comp_data = operator_comp_data;
880        entities_comp_data.extend(verifier_comp_data);
881
882        // add aggregators own data
883        let aggregator_comp_data = EntityDataWithId {
884            entity_id: Some(RPCEntityId {
885                kind: EntityType::Aggregator as i32,
886                id: "Aggregator".to_string(),
887            }),
888            data_result: {
889                let compatibility_params: Result<CompatibilityParamsRpc, eyre::Report> =
890                    self.get_compatibility_params()?.try_into();
891                match compatibility_params {
892                    Ok(compatibility_params) => Some(DataResult::Data(compatibility_params)),
893                    Err(e) => Some(DataResult::Error(e.to_string())),
894                }
895            },
896        };
897
898        entities_comp_data.push(aggregator_comp_data);
899
900        // Add error entries for unreachable entities
901        Self::add_unreachable_entity_errors(
902            &mut entities_comp_data,
903            &operator_keys,
904            &verifier_keys,
905            |entity_type, id, error_msg| EntityDataWithId {
906                entity_id: Some(RPCEntityId {
907                    kind: entity_type as i32,
908                    id,
909                }),
910                data_result: Some(DataResult::Error(error_msg)),
911            },
912        );
913
914        Ok(entities_comp_data)
915    }
916
917    /// Checks compatibility with other actors.
918    /// Returns an error if aggregator is not compatible with any of the other actors, or any other actor returns an error.
919    pub async fn check_compatibility_with_actors(
920        &self,
921        scope: CompatibilityCheckScope,
922    ) -> Result<(), BridgeError> {
923        let mut errors = Vec::new();
924        let mut operator_futures = Vec::new();
925        let mut verifier_futures = Vec::new();
926
927        let operators_included = matches!(
928            scope,
929            CompatibilityCheckScope::OperatorsOnly | CompatibilityCheckScope::Both
930        );
931        let verifiers_included = matches!(
932            scope,
933            CompatibilityCheckScope::VerifiersOnly | CompatibilityCheckScope::Both
934        );
935
936        if operators_included {
937            let operator_keys = self.fetch_operator_keys().await?;
938            for (operator_id, operator_client) in operator_keys
939                .into_iter()
940                .map(OperatorId)
941                .zip(self.operator_clients.iter())
942            {
943                let mut operator_client = operator_client.clone();
944                let operator_id_str = operator_id.to_string();
945
946                operator_futures.push(async move {
947                    let compatibility_params: CompatibilityParams = operator_client
948                        .get_compatibility_params(Empty {})
949                        .await
950                        .wrap_err(format!(
951                            "{operator_id_str} compatibility params retrieval failed"
952                        ))?
953                        .into_inner()
954                        .try_into()
955                        .wrap_err(format!(
956                            "{operator_id_str} compatibility params conversion failed"
957                        ))?;
958                    Ok::<_, BridgeError>((operator_id_str, compatibility_params))
959                });
960            }
961        }
962
963        if verifiers_included {
964            let verifier_keys = self.fetch_verifier_keys().await?;
965            for (verifier_id, verifier_client) in verifier_keys
966                .into_iter()
967                .map(VerifierId)
968                .zip(self.verifier_clients.iter())
969            {
970                let mut verifier_client = verifier_client.clone();
971                let verifier_id_str = verifier_id.to_string();
972
973                verifier_futures.push(async move {
974                    let compatibility_params: CompatibilityParams = verifier_client
975                        .get_compatibility_params(Empty {})
976                        .await
977                        .wrap_err(format!(
978                            "{verifier_id_str} compatibility params retrieval failed"
979                        ))?
980                        .into_inner()
981                        .try_into()
982                        .wrap_err(format!(
983                            "{verifier_id_str} compatibility params conversion failed"
984                        ))?;
985                    Ok::<_, BridgeError>((verifier_id_str, compatibility_params))
986                });
987            }
988        }
989
990        // Run all futures in parallel and combine errors
991        let (operator_results, operator_err) = join_all_partition_results(operator_futures).await;
992        let (verifier_results, verifier_err) = join_all_partition_results(verifier_futures).await;
993
994        let mut actors_compat_params = Vec::new();
995        actors_compat_params.extend(operator_results);
996        actors_compat_params.extend(verifier_results);
997
998        if let Some(operator_err) = operator_err {
999            errors.push(format!(
1000                "Error while retrieving operator compatibility params: {operator_err}"
1001            ));
1002        }
1003        if let Some(verifier_err) = verifier_err {
1004            errors.push(format!(
1005                "Error while retrieving verifier compatibility params: {verifier_err}"
1006            ));
1007        }
1008
1009        // Combine compatibility error and other errors (ex: connection) into a single message
1010        if let Err(e) = self.is_compatible(actors_compat_params) {
1011            errors.push(format!("Clementine not compatible with some actors: {e}"));
1012        }
1013        if !errors.is_empty() {
1014            return Err(eyre::eyre!(errors.join("; ")).into());
1015        }
1016
1017        Ok(())
1018    }
1019}
1020
1021/// Aggregator server wrapper that manages background tasks.
1022#[derive(Debug)]
1023pub struct AggregatorServer {
1024    pub aggregator: Aggregator,
1025    background_tasks: crate::task::manager::BackgroundTaskManager,
1026}
1027
1028impl AggregatorServer {
1029    pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
1030        let aggregator = Aggregator::new(config.clone()).await?;
1031        let background_tasks = crate::task::manager::BackgroundTaskManager::default();
1032
1033        Ok(Self {
1034            aggregator,
1035            background_tasks,
1036        })
1037    }
1038
1039    /// Starts the background tasks for the aggregator.
1040    /// If called multiple times, it will restart only the tasks that are not already running.
1041    pub async fn start_background_tasks(&self) -> Result<(), BridgeError> {
1042        // Start the aggregator metric publisher task
1043        self.background_tasks
1044            .ensure_task_looping(
1045                crate::task::aggregator_metric_publisher::AggregatorMetricPublisher::new(
1046                    self.aggregator.clone(),
1047                )
1048                .await?
1049                .with_delay(AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY),
1050            )
1051            .await;
1052
1053        tracing::info!("Aggregator metric publisher task started");
1054
1055        Ok(())
1056    }
1057}
1058
1059impl Deref for AggregatorServer {
1060    type Target = Aggregator;
1061
1062    fn deref(&self) -> &Self::Target {
1063        &self.aggregator
1064    }
1065}