clementine_core/
aggregator.rs

1use std::collections::HashSet;
2use std::ops::Deref;
3use std::sync::Arc;
4
5use crate::compatibility::{ActorWithConfig, CompatibilityParams};
6use crate::constants::{
7    ENTITY_COMP_DATA_POLL_TIMEOUT, ENTITY_STATUS_POLL_TIMEOUT, OPERATOR_GET_KEYS_TIMEOUT,
8    PUBLIC_KEY_COLLECTION_TIMEOUT, RESTART_BACKGROUND_TASKS_TIMEOUT, VERIFIER_SEND_KEYS_TIMEOUT,
9};
10use crate::deposit::DepositData;
11use crate::extended_bitcoin_rpc::ExtendedBitcoinRpc;
12use crate::rpc::clementine::entity_data_with_id::DataResult;
13use crate::rpc::clementine::entity_status_with_id::StatusResult;
14use crate::rpc::clementine::{
15    self, CompatibilityParamsRpc, DepositParams, Empty, EntityStatusWithId, EntityType,
16    OperatorKeysWithDeposit,
17};
18use crate::rpc::clementine::{EntityDataWithId, EntityId as RPCEntityId};
19use crate::task::aggregator_metric_publisher::AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY;
20use crate::task::TaskExt;
21#[cfg(feature = "automation")]
22use crate::tx_sender::TxSenderClient;
23use crate::utils::{
24    flatten_join_named_results, join_all_partition_results, timed_request, timed_try_join_all,
25};
26use crate::{
27    config::BridgeConfig,
28    database::Database,
29    rpc::{
30        self,
31        clementine::{
32            clementine_operator_client::ClementineOperatorClient,
33            clementine_verifier_client::ClementineVerifierClient,
34        },
35    },
36};
37use bitcoin::secp256k1::PublicKey;
38use bitcoin::XOnlyPublicKey;
39use clementine_errors::BridgeError;
40use eyre::Context;
41use futures::future::join_all;
42use std::future::Future;
43use std::hash::Hash as StdHash;
44use tokio::sync::RwLock;
45use tonic::{Request, Status};
46use tracing::{debug_span, Instrument};
47
48/// Aggregator struct.
49/// This struct is responsible for aggregating partial signatures from the verifiers.
50/// It will have in total 3 * num_operator + 1 aggregated nonces.
51/// \[0\] -> Aggregated nonce for the move transaction.
52/// [1..num_operator + 1] -> Aggregated nonces for the operator_takes transactions.
53/// [num_operator + 1..2 * num_operator + 1] -> Aggregated nonces for the slash_or_take transactions.
54/// [2 * num_operator + 1..3 * num_operator + 1] -> Aggregated nonces for the burn transactions.
55/// For now, we do not have the last bit.
56#[derive(Debug, Clone)]
57pub struct Aggregator {
58    pub(crate) rpc: ExtendedBitcoinRpc,
59    pub(crate) db: Database,
60    pub(crate) config: BridgeConfig,
61    #[cfg(feature = "automation")]
62    pub(crate) tx_sender: TxSenderClient<Database>,
63    operator_clients: Vec<ClementineOperatorClient<tonic::transport::Channel>>,
64    verifier_clients: Vec<ClementineVerifierClient<tonic::transport::Channel>>,
65    verifier_keys: Arc<RwLock<Vec<Option<PublicKey>>>>,
66    operator_keys: Arc<RwLock<Vec<Option<XOnlyPublicKey>>>>,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
70pub enum EntityId {
71    Verifier(VerifierId),
72    Operator(OperatorId),
73    Aggregator,
74}
75
76/// Specifies which entity types to include when checking compatibility.
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum CompatibilityCheckScope {
79    /// Check compatibility with verifiers only.
80    VerifiersOnly,
81    /// Check compatibility with operators only.
82    OperatorsOnly,
83    /// Check compatibility with both verifiers and operators.
84    Both,
85}
86
87/// Wrapper struct that renders the verifier id in the logs.
88#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
89pub struct VerifierId(pub PublicKey);
90
91/// Wrapper struct that renders the operator id in the logs.
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
93pub struct OperatorId(pub XOnlyPublicKey);
94
95impl std::fmt::Display for EntityId {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        match self {
98            EntityId::Aggregator => write!(f, "Aggregator"),
99            EntityId::Verifier(id) => write!(f, "{id}"),
100            EntityId::Operator(id) => write!(f, "{id}"),
101        }
102    }
103}
104
105impl std::fmt::Display for VerifierId {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        write!(f, "Verifier({})", &self.0.to_string()[..10])
108    }
109}
110
111impl std::fmt::Display for OperatorId {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        write!(f, "Operator({})", &self.0.to_string()[..10])
114    }
115}
116
117/// Wrapper struct that matches verifier clients with their ids.
118#[derive(Debug, Clone)]
119pub struct ParticipatingVerifiers(
120    pub  Vec<(
121        ClementineVerifierClient<tonic::transport::Channel>,
122        VerifierId,
123    )>,
124);
125
126impl ParticipatingVerifiers {
127    pub fn new(
128        verifiers: Vec<(
129            ClementineVerifierClient<tonic::transport::Channel>,
130            VerifierId,
131        )>,
132    ) -> Self {
133        Self(verifiers)
134    }
135
136    pub fn clients(&self) -> Vec<ClementineVerifierClient<tonic::transport::Channel>> {
137        self.0.iter().map(|(client, _)| client.clone()).collect()
138    }
139
140    pub fn ids(&self) -> Vec<VerifierId> {
141        self.0.iter().map(|(_, id)| *id).collect()
142    }
143}
144
145/// Wrapper struct that matches operator clients with their ids.
146#[derive(Debug, Clone)]
147pub struct ParticipatingOperators(
148    pub  Vec<(
149        ClementineOperatorClient<tonic::transport::Channel>,
150        OperatorId,
151    )>,
152);
153
154impl ParticipatingOperators {
155    pub fn new(
156        operators: Vec<(
157            ClementineOperatorClient<tonic::transport::Channel>,
158            OperatorId,
159        )>,
160    ) -> Self {
161        Self(operators)
162    }
163
164    pub fn clients(&self) -> Vec<ClementineOperatorClient<tonic::transport::Channel>> {
165        self.0.iter().map(|(client, _)| client.clone()).collect()
166    }
167
168    pub fn ids(&self) -> Vec<OperatorId> {
169        self.0.iter().map(|(_, id)| *id).collect()
170    }
171}
172
173impl Aggregator {
174    pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
175        let db = Database::new(&config).await?;
176
177        let rpc = ExtendedBitcoinRpc::connect(
178            config.bitcoin_rpc_url.clone(),
179            config.bitcoin_rpc_user.clone(),
180            config.bitcoin_rpc_password.clone(),
181            None,
182        )
183        .await?;
184
185        let verifier_endpoints =
186            config
187                .verifier_endpoints
188                .clone()
189                .ok_or(BridgeError::ConfigError(
190                    "No verifier endpoints provided in config".into(),
191                ))?;
192
193        let operator_endpoints =
194            config
195                .operator_endpoints
196                .clone()
197                .ok_or(BridgeError::ConfigError(
198                    "No operator endpoints provided in config".into(),
199                ))?;
200
201        // Create clients to connect to all verifiers
202        let verifier_clients = rpc::get_clients(
203            verifier_endpoints,
204            crate::rpc::verifier_client_builder(&config),
205            &config,
206            true,
207        )
208        .await?;
209
210        // Create clients to connect to all operators
211        let operator_clients = rpc::get_clients(
212            operator_endpoints,
213            crate::rpc::operator_client_builder(&config),
214            &config,
215            true,
216        )
217        .await?;
218
219        #[cfg(feature = "automation")]
220        let tx_sender = TxSenderClient::new(db.clone(), "aggregator".to_string());
221
222        tracing::info!(
223            "Aggregator created with {} verifiers and {} operators",
224            verifier_clients.len(),
225            operator_clients.len(),
226        );
227
228        let operator_keys = Arc::new(RwLock::new(vec![None; operator_clients.len()]));
229        let verifier_keys = Arc::new(RwLock::new(vec![None; verifier_clients.len()]));
230
231        Ok(Aggregator {
232            rpc,
233            db,
234            config,
235            #[cfg(feature = "automation")]
236            tx_sender,
237            verifier_clients,
238            operator_clients,
239            verifier_keys,
240            operator_keys,
241        })
242    }
243
244    pub fn get_verifier_clients(&self) -> &[ClementineVerifierClient<tonic::transport::Channel>] {
245        &self.verifier_clients
246    }
247
248    /// Generic helper function to fetch keys from clients
249    async fn fetch_pubkeys_from_entities<T, C, F, Fut>(
250        &self,
251        clients: &[C],
252        keys_storage: &RwLock<Vec<Option<T>>>,
253        pubkey_fetcher: F,
254        key_type_name: &str,
255    ) -> Result<Vec<T>, BridgeError>
256    where
257        T: Clone + Send + Sync + Eq + StdHash + std::fmt::Debug,
258        C: Clone + Send + Sync,
259        F: Fn(C) -> Fut + Send + Sync,
260        Fut: Future<Output = Result<T, BridgeError>> + Send,
261    {
262        // Check if all keys are collected
263        let all_collected = {
264            let keys = keys_storage.read().await;
265            keys.iter().all(|key| key.is_some())
266        };
267
268        if !all_collected {
269            // get a write lock early, so that only one thread can try to collect keys
270            let mut keys = keys_storage.write().await;
271
272            // sanity check because we directly use indexes below
273            if keys.len() != clients.len() {
274                return Err(eyre::eyre!(
275                    "Keys storage length does not match clients length, should not happen, keys length: {}, clients length: {}",
276                    keys.len(),
277                    clients.len()
278                )
279                .into());
280            }
281
282            let key_collection_futures = clients
283                .iter()
284                .zip(keys.iter().enumerate())
285                .filter_map(|(client, (idx, key))| {
286                    if key.is_none() {
287                        Some((idx, pubkey_fetcher(client.clone())))
288                    } else {
289                        None
290                    }
291                })
292                .map(|(idx, fut)| async move { (idx, fut.await) });
293
294            let collected_keys = join_all(key_collection_futures).await;
295            let mut missing_keys = Vec::new();
296
297            // Fill in keys with the results of the futures
298            for (idx, new_key) in collected_keys {
299                match new_key {
300                    Ok(new_key) => keys[idx] = Some(new_key),
301                    Err(e) => {
302                        tracing::debug!(
303                            "Failed to collect {} {} (order in config) key: {}",
304                            key_type_name,
305                            idx,
306                            e
307                        );
308                        missing_keys.push(idx);
309                    }
310                }
311            }
312
313            // if keys are not unique, return an error if so
314            let non_none_keys: Vec<_> = keys.iter().filter_map(|key| key.as_ref()).collect();
315            let unique_keys: HashSet<_> = non_none_keys.iter().cloned().collect();
316
317            if unique_keys.len() != non_none_keys.len() {
318                let reason = format!("{key_type_name} keys are not unique: {keys:?}");
319                // reset all keys to None so that faulty keys are not used
320                for key in keys.iter_mut() {
321                    *key = None;
322                }
323                return Err(eyre::eyre!(reason).into());
324            }
325
326            // if not all keys were collected, return an error
327            if keys.iter().any(|key| key.is_none()) {
328                return Err(eyre::eyre!(
329                    "Not all {} keys were able to be collected, missing keys at indices: {:?}",
330                    key_type_name,
331                    missing_keys
332                )
333                .into());
334            }
335        }
336
337        // return all keys if they were all collected
338        Ok(keys_storage
339            .read()
340            .await
341            .iter()
342            .map(|key| key.clone().expect("should all be collected"))
343            .collect())
344    }
345
346    /// If all verifier keys are already collected, returns them.
347    /// Otherwise, it tries to collect them from the verifiers, saves them and returns them.
348    pub async fn fetch_verifier_keys(&self) -> Result<Vec<PublicKey>, BridgeError> {
349        self.fetch_pubkeys_from_entities(
350            &self.verifier_clients,
351            &self.verifier_keys,
352            |mut client| async move {
353                let mut request = Request::new(Empty {});
354                request.set_timeout(PUBLIC_KEY_COLLECTION_TIMEOUT);
355                let verifier_params = client.get_params(request).await?.into_inner();
356                let public_key = PublicKey::from_slice(&verifier_params.public_key)
357                    .map_err(|e| eyre::eyre!("Failed to parse verifier public key: {}", e))?;
358                Ok::<_, BridgeError>(public_key)
359            },
360            "verifier",
361        )
362        .await
363    }
364
365    /// If all operator keys are already collected, returns them.
366    /// Otherwise, it tries to collect them from the operators, saves them and returns them.
367    pub async fn fetch_operator_keys(&self) -> Result<Vec<XOnlyPublicKey>, BridgeError> {
368        self.fetch_pubkeys_from_entities(
369            &self.operator_clients,
370            &self.operator_keys,
371            |mut client| async move {
372                let mut request = Request::new(Empty {});
373                request.set_timeout(PUBLIC_KEY_COLLECTION_TIMEOUT);
374                let operator_xonly_pk: XOnlyPublicKey = client
375                    .get_x_only_public_key(request)
376                    .await?
377                    .into_inner()
378                    .try_into()?;
379                Ok::<_, BridgeError>(operator_xonly_pk)
380            },
381            "operator",
382        )
383        .await
384    }
385
386    pub fn get_operator_clients(&self) -> &[ClementineOperatorClient<tonic::transport::Channel>] {
387        &self.operator_clients
388    }
389
390    /// Collects and distributes keys to verifiers from operators and watchtowers for the new deposit
391    /// for operators: get bitvm assert winternitz public keys and watchtower challenge ack hashes
392    /// for watchtowers: get winternitz public keys for watchtower challenges
393    pub async fn collect_and_distribute_keys(
394        &self,
395        deposit_params: &DepositParams,
396    ) -> Result<(), BridgeError> {
397        tracing::info!("Starting collect_and_distribute_keys");
398
399        let start_time = std::time::Instant::now();
400
401        let deposit_data: DepositData = deposit_params.clone().try_into()?;
402
403        // Create channels with larger capacity to prevent blocking
404        let (operator_keys_tx, operator_keys_rx) =
405            tokio::sync::broadcast::channel::<clementine::OperatorKeysWithDeposit>(
406                deposit_data.get_num_operators() * deposit_data.get_num_verifiers(),
407            );
408        let operator_rx_handles = (0..deposit_data.get_num_verifiers())
409            .map(|_| operator_keys_rx.resubscribe())
410            .collect::<Vec<_>>();
411
412        let operators = self.get_participating_operators(&deposit_data).await?;
413        let operator_clients = operators.clients();
414
415        let operator_xonly_pks = deposit_data.get_operators();
416        let deposit = deposit_params.clone();
417
418        tracing::info!("Starting operator key collection");
419        #[cfg(test)]
420        let timeout_params = self.config.test_params.timeout_params;
421        #[allow(clippy::unused_enumerate_index)]
422        let get_operators_keys_handle = tokio::spawn(timed_try_join_all(
423            OPERATOR_GET_KEYS_TIMEOUT,
424            "Operator key collection",
425            Some(operators.ids()),
426            operator_clients
427                .into_iter()
428                .zip(operator_xonly_pks.into_iter())
429                .enumerate()
430                .map(move |(_idx, (mut operator_client, operator_xonly_pk))| {
431                    let deposit_params = deposit.clone();
432                    let tx = operator_keys_tx.clone();
433                    async move {
434                        #[cfg(test)]
435                        timeout_params
436                            .hook_timeout_key_collection_operator(_idx)
437                            .await;
438
439                        let operator_keys = operator_client
440                            .get_deposit_keys(deposit_params.clone())
441                            .instrument(
442                                debug_span!("get_deposit_keys", id=%OperatorId(operator_xonly_pk)),
443                            )
444                            .await
445                            .wrap_err(Status::internal(format!(
446                                "Operator {operator_xonly_pk} key retrieval failed"
447                            )))?
448                            .into_inner();
449
450                        // A send error means that all receivers are closed,
451                        // receivers only close if they have an error (while
452                        // loop condition)
453                        // We don't care about the result of the send, we
454                        // only care about the error on the other side.
455                        // Ignore this error, and let the other side's error
456                        // propagate.
457                        let _ = tx.send(OperatorKeysWithDeposit {
458                            deposit_params: Some(deposit_params),
459                            operator_keys: Some(operator_keys),
460                            operator_xonly_pk: operator_xonly_pk.serialize().to_vec(),
461                        });
462
463                        Ok(())
464                    }
465                }),
466        ));
467
468        tracing::info!("Starting operator key distribution to verifiers");
469        let verifiers = self.get_participating_verifiers(&deposit_data).await?;
470
471        let verifier_clients = verifiers.clients();
472        let num_operators = deposit_data.get_num_operators();
473
474        let verifier_ids = verifiers.ids();
475
476        #[cfg(test)]
477        let timeout_params = self.config.test_params.timeout_params;
478        #[allow(clippy::unused_enumerate_index)]
479        let distribute_operators_keys_handle = tokio::spawn(timed_try_join_all(
480            VERIFIER_SEND_KEYS_TIMEOUT,
481            "Verifier key distribution",
482            Some(verifier_ids.clone()),
483            verifier_clients
484                .into_iter()
485                .zip(operator_rx_handles)
486                .zip(verifier_ids)
487                .enumerate()
488                .map(
489                    move |(_idx, ((mut verifier, mut rx), verifier_id))| async move {
490                        #[cfg(test)]
491                        timeout_params
492                            .hook_timeout_key_distribution_verifier(_idx)
493                            .await;
494
495                        // Only wait for expected number of messages
496                        let mut received_keys = std::collections::HashSet::new();
497                        while received_keys.len() < num_operators {
498                            tracing::debug!(
499                                "Waiting for operator key (received {}/{})",
500                                received_keys.len(),
501                                num_operators
502                            );
503
504                            // This will not block forever because of the timeout on the join all.
505                            let operator_keys = rx
506                            .recv()
507                            .instrument(debug_span!("operator_keys_recv"))
508                            .await
509                            .wrap_err(Status::internal(
510                                "Operator broadcast channels closed before all keys were received",
511                            ))?;
512
513                            let operator_xonly_pk = operator_keys.operator_xonly_pk.clone();
514
515                            if !received_keys.insert(operator_xonly_pk.clone()) {
516                                continue;
517                            }
518
519                            timed_request(
520                                VERIFIER_SEND_KEYS_TIMEOUT,
521                                &format!("Setting operator keys for {verifier_id}"),
522                                async {
523                                    Ok(verifier
524                                        .set_operator_keys(operator_keys)
525                                        .await
526                                        .wrap_err_with(|| {
527                                            Status::internal(format!(
528                                                "Failed to set operator keys for {verifier_id}",
529                                            ))
530                                        }))
531                                },
532                            )
533                            .await??;
534                        }
535                        Ok::<_, BridgeError>(())
536                    },
537                ),
538        ));
539
540        // Wait for all tasks to complete
541        let (get_operators_keys_result, distribute_operators_keys_result) =
542            tokio::join!(get_operators_keys_handle, distribute_operators_keys_handle);
543
544        flatten_join_named_results([
545            ("get_operators_keys", get_operators_keys_result),
546            (
547                "distribute_operators_keys",
548                distribute_operators_keys_result,
549            ),
550        ])?;
551
552        tracing::info!(
553            "collect_and_distribute_keys completed in {:?}",
554            start_time.elapsed()
555        );
556
557        Ok(())
558    }
559
560    /// Returns a list of verifier clients that are participating in the deposit.
561    pub async fn get_participating_verifiers(
562        &self,
563        deposit_data: &DepositData,
564    ) -> Result<ParticipatingVerifiers, BridgeError> {
565        let verifier_keys = self.fetch_verifier_keys().await?;
566        let mut participating_verifiers = Vec::new();
567
568        let verifiers = deposit_data.get_verifiers();
569
570        for verifier_pk in verifiers {
571            if let Some(pos) = verifier_keys.iter().position(|key| key == &verifier_pk) {
572                participating_verifiers
573                    .push((self.verifier_clients[pos].clone(), VerifierId(verifier_pk)));
574            } else {
575                tracing::error!(
576                    "Verifier public key not found. Deposit data verifier keys: {:?}, self verifier keys: {:?}",
577                    deposit_data.get_verifiers(),
578                    self.verifier_keys
579                );
580                return Err(BridgeError::VerifierNotFound(verifier_pk));
581            }
582        }
583
584        Ok(ParticipatingVerifiers::new(participating_verifiers))
585    }
586
587    /// Returns a list of operator clients that are participating in the deposit.
588    pub async fn get_participating_operators(
589        &self,
590        deposit_data: &DepositData,
591    ) -> Result<ParticipatingOperators, BridgeError> {
592        let operator_keys = self.fetch_operator_keys().await?;
593        let mut participating_operators = Vec::new();
594
595        let operators = deposit_data.get_operators();
596
597        for operator_pk in operators {
598            if let Some(pos) = operator_keys.iter().position(|key| key == &operator_pk) {
599                participating_operators
600                    .push((self.operator_clients[pos].clone(), OperatorId(operator_pk)));
601            } else {
602                return Err(BridgeError::OperatorNotFound(operator_pk));
603            }
604        }
605
606        Ok(ParticipatingOperators::new(participating_operators))
607    }
608
609    /// Helper function to fetch keys for both operators and verifiers.
610    /// Returns (operator_keys, verifier_keys) without failing if some entities are unreachable.
611    async fn fetch_all_entity_keys(&self) -> (Vec<Option<XOnlyPublicKey>>, Vec<Option<PublicKey>>) {
612        // Try to reach all operators and verifiers to collect keys, but do not return err if some can't be reached
613        let _ = self.fetch_operator_keys().await;
614        let _ = self.fetch_verifier_keys().await;
615
616        let operator_keys = self.operator_keys.read().await.clone();
617        let verifier_keys = self.verifier_keys.read().await.clone();
618
619        (operator_keys, verifier_keys)
620    }
621
622    /// Helper function to add error entries for entities where keys couldn't be collected.
623    fn add_unreachable_entity_errors<T, F>(
624        results: &mut Vec<T>,
625        operator_keys: &[Option<XOnlyPublicKey>],
626        verifier_keys: &[Option<PublicKey>],
627        error_constructor: F,
628    ) where
629        F: Fn(EntityType, String, String) -> T,
630    {
631        for (index, key) in operator_keys.iter().enumerate() {
632            if key.is_none() {
633                results.push(error_constructor(
634                    EntityType::Operator,
635                    format!("Index {index} in config (0-based)"),
636                    "Operator key was not able to be collected".to_string(),
637                ));
638            }
639        }
640        for (index, key) in verifier_keys.iter().enumerate() {
641            if key.is_none() {
642                results.push(error_constructor(
643                    EntityType::Verifier,
644                    format!("Index {index} in config (0-based)"),
645                    "Verifier key was not able to be collected".to_string(),
646                ));
647            }
648        }
649    }
650
651    /// Retrieves the status of all entities (operators and verifiers) and restarts background tasks if needed.
652    /// Returns a vector of EntityStatusWithId. Only returns an error if restarting tasks fails when requested.
653    pub async fn get_entity_statuses(
654        &self,
655        restart_tasks: bool,
656    ) -> Result<Vec<EntityStatusWithId>, BridgeError> {
657        tracing::debug!("Getting entities status");
658
659        let operator_clients = self.get_operator_clients();
660        let verifier_clients = self.get_verifier_clients();
661        tracing::debug!("Operator clients: {:?}", operator_clients.len());
662
663        let (operator_keys, verifier_keys) = self.fetch_all_entity_keys().await;
664
665        // Query operators for status
666        let operator_status = join_all(
667            operator_clients
668                .iter()
669                .zip(operator_keys.iter())
670                .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
671                .map(|(client, key)| {
672                    let mut client = client.clone();
673                    let key = *key;
674                    async move {
675                        tracing::debug!("Getting operator status for {:?}", key);
676                        let mut request = Request::new(Empty {});
677                        request.set_timeout(ENTITY_STATUS_POLL_TIMEOUT);
678                        let response = client.get_current_status(request).await;
679
680                        EntityStatusWithId {
681                            entity_id: Some(RPCEntityId {
682                                kind: EntityType::Operator as i32,
683                                id: key.to_string(),
684                            }),
685                            status_result: match response {
686                                Ok(response) => Some(StatusResult::Status(response.into_inner())),
687                                Err(e) => Some(StatusResult::Err(clementine::EntityError {
688                                    error: e.to_string(),
689                                })),
690                            },
691                        }
692                    }
693                }),
694        )
695        .await;
696
697        // Query verifiers for status
698        let verifier_status = join_all(
699            verifier_clients
700                .iter()
701                .zip(verifier_keys.iter())
702                .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
703                .map(|(client, key)| {
704                    let mut client = client.clone();
705                    let key = *key;
706                    async move {
707                        tracing::debug!("Getting verifier status for {:?}", key);
708                        let mut request = Request::new(Empty {});
709                        request.set_timeout(ENTITY_STATUS_POLL_TIMEOUT);
710                        let response = client.get_current_status(request).await;
711
712                        EntityStatusWithId {
713                            entity_id: Some(RPCEntityId {
714                                kind: EntityType::Verifier as i32,
715                                id: key.to_string(),
716                            }),
717                            status_result: match response {
718                                Ok(response) => Some(StatusResult::Status(response.into_inner())),
719                                Err(e) => Some(StatusResult::Err(clementine::EntityError {
720                                    error: e.to_string(),
721                                })),
722                            },
723                        }
724                    }
725                }),
726        )
727        .await;
728
729        // Combine results
730        let mut entity_statuses = operator_status;
731        entity_statuses.extend(verifier_status);
732
733        // try to restart background tasks if requested, with a timeout
734        if restart_tasks {
735            let operator_tasks = operator_clients
736                .iter()
737                .zip(operator_keys.iter())
738                .filter_map(|(client, key)| key.map(|key| (client, key)))
739                .map(|(client, key)| {
740                    let mut client = client.clone();
741                    async move {
742                        let mut request = Request::new(Empty {});
743                        request.set_timeout(RESTART_BACKGROUND_TASKS_TIMEOUT);
744                        client
745                            .restart_background_tasks(Request::new(Empty {}))
746                            .await
747                            .wrap_err_with(|| {
748                                Status::internal(format!(
749                                    "Failed to restart background tasks for operator {key}"
750                                ))
751                            })
752                    }
753                });
754
755            let verifier_tasks = verifier_clients
756                .iter()
757                .zip(verifier_keys.iter())
758                .filter_map(|(client, key)| key.map(|key| (client, key)))
759                .map(|(client, key)| {
760                    let mut client = client.clone();
761                    async move {
762                        let mut request = Request::new(Empty {});
763                        request.set_timeout(RESTART_BACKGROUND_TASKS_TIMEOUT);
764                        client
765                            .restart_background_tasks(Request::new(Empty {}))
766                            .await
767                            .wrap_err_with(|| {
768                                Status::internal(format!(
769                                    "Failed to restart background tasks for verifier {key}"
770                                ))
771                            })
772                    }
773                });
774
775            let (operator_results, verifier_results) = futures::join!(
776                futures::future::join_all(operator_tasks),
777                futures::future::join_all(verifier_tasks)
778            );
779            // Log any errors from restart_background_tasks rpc call
780            for result in operator_results
781                .into_iter()
782                .chain(verifier_results.into_iter())
783            {
784                if let Err(e) = result {
785                    tracing::error!("{e:?}");
786                }
787            }
788        }
789
790        // Add error entries for unreachable entities
791        Self::add_unreachable_entity_errors(
792            &mut entity_statuses,
793            &operator_keys,
794            &verifier_keys,
795            |entity_type, id, error_msg| EntityStatusWithId {
796                entity_id: Some(RPCEntityId {
797                    kind: entity_type as i32,
798                    id,
799                }),
800                status_result: Some(StatusResult::Err(clementine::EntityError {
801                    error: error_msg,
802                })),
803            },
804        );
805
806        Ok(entity_statuses)
807    }
808
809    pub async fn get_compatibility_data_from_entities(
810        &self,
811    ) -> Result<Vec<EntityDataWithId>, BridgeError> {
812        let operator_clients = self.get_operator_clients();
813        let verifier_clients = self.get_verifier_clients();
814
815        let (operator_keys, verifier_keys) = self.fetch_all_entity_keys().await;
816
817        // Query operators for compatibility data
818        let operator_comp_data = join_all(
819            operator_clients
820                .iter()
821                .zip(operator_keys.iter())
822                .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
823                .map(|(client, key)| {
824                    let mut client = client.clone();
825                    let key = *key;
826                    async move {
827                        tracing::debug!("Getting operator compatibility data for {:?}", key);
828                        let mut request = Request::new(Empty {});
829                        request.set_timeout(ENTITY_COMP_DATA_POLL_TIMEOUT);
830                        let response = client.get_compatibility_params(request).await;
831
832                        EntityDataWithId {
833                            entity_id: Some(RPCEntityId {
834                                kind: EntityType::Operator as i32,
835                                id: key.to_string(),
836                            }),
837                            data_result: match response {
838                                Ok(response) => Some(DataResult::Data(response.into_inner())),
839                                Err(e) => Some(DataResult::Error(e.to_string())),
840                            },
841                        }
842                    }
843                }),
844        )
845        .await;
846
847        // Query verifiers for compatibility data
848        let verifier_comp_data = join_all(
849            verifier_clients
850                .iter()
851                .zip(verifier_keys.iter())
852                .filter_map(|(client, key)| key.as_ref().map(|k| (client, k)))
853                .map(|(client, key)| {
854                    let mut client = client.clone();
855                    let key = *key;
856                    async move {
857                        tracing::debug!("Getting verifier compatibility data for {:?}", key);
858                        let mut request = Request::new(Empty {});
859                        request.set_timeout(ENTITY_COMP_DATA_POLL_TIMEOUT);
860                        let response = client.get_compatibility_params(request).await;
861
862                        EntityDataWithId {
863                            entity_id: Some(RPCEntityId {
864                                kind: EntityType::Verifier as i32,
865                                id: key.to_string(),
866                            }),
867                            data_result: match response {
868                                Ok(response) => Some(DataResult::Data(response.into_inner())),
869                                Err(e) => Some(DataResult::Error(e.to_string())),
870                            },
871                        }
872                    }
873                }),
874        )
875        .await;
876
877        // Combine results
878        let mut entities_comp_data = operator_comp_data;
879        entities_comp_data.extend(verifier_comp_data);
880
881        // add aggregators own data
882        let aggregator_comp_data = EntityDataWithId {
883            entity_id: Some(RPCEntityId {
884                kind: EntityType::Aggregator as i32,
885                id: "Aggregator".to_string(),
886            }),
887            data_result: {
888                let compatibility_params: Result<CompatibilityParamsRpc, eyre::Report> =
889                    self.get_compatibility_params()?.try_into();
890                match compatibility_params {
891                    Ok(compatibility_params) => Some(DataResult::Data(compatibility_params)),
892                    Err(e) => Some(DataResult::Error(e.to_string())),
893                }
894            },
895        };
896
897        entities_comp_data.push(aggregator_comp_data);
898
899        // Add error entries for unreachable entities
900        Self::add_unreachable_entity_errors(
901            &mut entities_comp_data,
902            &operator_keys,
903            &verifier_keys,
904            |entity_type, id, error_msg| EntityDataWithId {
905                entity_id: Some(RPCEntityId {
906                    kind: entity_type as i32,
907                    id,
908                }),
909                data_result: Some(DataResult::Error(error_msg)),
910            },
911        );
912
913        Ok(entities_comp_data)
914    }
915
916    /// Checks compatibility with other actors.
917    /// Returns an error if aggregator is not compatible with any of the other actors, or any other actor returns an error.
918    pub async fn check_compatibility_with_actors(
919        &self,
920        scope: CompatibilityCheckScope,
921    ) -> Result<(), BridgeError> {
922        let mut errors = Vec::new();
923        let mut operator_futures = Vec::new();
924        let mut verifier_futures = Vec::new();
925
926        let operators_included = matches!(
927            scope,
928            CompatibilityCheckScope::OperatorsOnly | CompatibilityCheckScope::Both
929        );
930        let verifiers_included = matches!(
931            scope,
932            CompatibilityCheckScope::VerifiersOnly | CompatibilityCheckScope::Both
933        );
934
935        if operators_included {
936            let operator_keys = self.fetch_operator_keys().await?;
937            for (operator_id, operator_client) in operator_keys
938                .into_iter()
939                .map(OperatorId)
940                .zip(self.operator_clients.iter())
941            {
942                let mut operator_client = operator_client.clone();
943                let operator_id_str = operator_id.to_string();
944
945                operator_futures.push(async move {
946                    let compatibility_params: CompatibilityParams = operator_client
947                        .get_compatibility_params(Empty {})
948                        .await
949                        .wrap_err(format!(
950                            "{operator_id_str} compatibility params retrieval failed"
951                        ))?
952                        .into_inner()
953                        .try_into()
954                        .wrap_err(format!(
955                            "{operator_id_str} compatibility params conversion failed"
956                        ))?;
957                    Ok::<_, BridgeError>((operator_id_str, compatibility_params))
958                });
959            }
960        }
961
962        if verifiers_included {
963            let verifier_keys = self.fetch_verifier_keys().await?;
964            for (verifier_id, verifier_client) in verifier_keys
965                .into_iter()
966                .map(VerifierId)
967                .zip(self.verifier_clients.iter())
968            {
969                let mut verifier_client = verifier_client.clone();
970                let verifier_id_str = verifier_id.to_string();
971
972                verifier_futures.push(async move {
973                    let compatibility_params: CompatibilityParams = verifier_client
974                        .get_compatibility_params(Empty {})
975                        .await
976                        .wrap_err(format!(
977                            "{verifier_id_str} compatibility params retrieval failed"
978                        ))?
979                        .into_inner()
980                        .try_into()
981                        .wrap_err(format!(
982                            "{verifier_id_str} compatibility params conversion failed"
983                        ))?;
984                    Ok::<_, BridgeError>((verifier_id_str, compatibility_params))
985                });
986            }
987        }
988
989        // Run all futures in parallel and combine errors
990        let (operator_results, operator_err) = join_all_partition_results(operator_futures).await;
991        let (verifier_results, verifier_err) = join_all_partition_results(verifier_futures).await;
992
993        let mut actors_compat_params = Vec::new();
994        actors_compat_params.extend(operator_results);
995        actors_compat_params.extend(verifier_results);
996
997        if let Some(operator_err) = operator_err {
998            errors.push(format!(
999                "Error while retrieving operator compatibility params: {operator_err}"
1000            ));
1001        }
1002        if let Some(verifier_err) = verifier_err {
1003            errors.push(format!(
1004                "Error while retrieving verifier compatibility params: {verifier_err}"
1005            ));
1006        }
1007
1008        // Combine compatibility error and other errors (ex: connection) into a single message
1009        if let Err(e) = self.is_compatible(actors_compat_params) {
1010            errors.push(format!("Clementine not compatible with some actors: {e}"));
1011        }
1012        if !errors.is_empty() {
1013            return Err(eyre::eyre!(errors.join("; ")).into());
1014        }
1015
1016        Ok(())
1017    }
1018}
1019
1020/// Aggregator server wrapper that manages background tasks.
1021#[derive(Debug)]
1022pub struct AggregatorServer {
1023    pub aggregator: Aggregator,
1024    background_tasks: crate::task::manager::BackgroundTaskManager,
1025}
1026
1027impl AggregatorServer {
1028    pub async fn new(config: BridgeConfig) -> Result<Self, BridgeError> {
1029        let aggregator = Aggregator::new(config.clone()).await?;
1030        let background_tasks = crate::task::manager::BackgroundTaskManager::default();
1031
1032        Ok(Self {
1033            aggregator,
1034            background_tasks,
1035        })
1036    }
1037
1038    /// Starts the background tasks for the aggregator.
1039    /// If called multiple times, it will restart only the tasks that are not already running.
1040    pub async fn start_background_tasks(&self) -> Result<(), BridgeError> {
1041        // Start the aggregator metric publisher task
1042        self.background_tasks
1043            .ensure_task_looping(
1044                crate::task::aggregator_metric_publisher::AggregatorMetricPublisher::new(
1045                    self.aggregator.clone(),
1046                )
1047                .await?
1048                .with_delay(AGGREGATOR_METRIC_PUBLISHER_POLL_DELAY),
1049            )
1050            .await;
1051
1052        tracing::info!("Aggregator metric publisher task started");
1053
1054        Ok(())
1055    }
1056}
1057
1058impl Deref for AggregatorServer {
1059    type Target = Aggregator;
1060
1061    fn deref(&self) -> &Self::Target {
1062        &self.aggregator
1063    }
1064}