clementine_core/database/
aggregator.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
//! # Verifier Related Database Operations
//!
//! This module includes database functions which are mainly used by a verifier.

use super::{wrapper::TxidDB, Database, DatabaseTransaction};
use crate::{errors::BridgeError, execute_query_with_tx};
use bitcoin::{consensus, Transaction, Txid};
use eyre;
use sqlx::QueryBuilder;

impl Database {
    /// Sets a signed emergency stop transaction for a given move transaction ID
    pub async fn set_signed_emergency_stop_tx(
        &self,
        tx: Option<DatabaseTransaction<'_, '_>>,
        move_txid: &Txid,
        emergency_stop_tx: &Transaction,
    ) -> Result<(), BridgeError> {
        let query = sqlx::query(
            "INSERT INTO emergency_stop_sigs (move_txid, emergency_stop_tx) VALUES ($1, $2)
             ON CONFLICT (move_txid) DO NOTHING;",
        )
        .bind(TxidDB(*move_txid))
        .bind(consensus::serialize(emergency_stop_tx));

        execute_query_with_tx!(self.connection, tx, query, execute)?;

        Ok(())
    }

    /// Gets emergency stop transactions for a list of move transaction IDs
    pub async fn get_emergency_stop_txs(
        &self,
        tx: Option<DatabaseTransaction<'_, '_>>,
        move_txids: Vec<Txid>,
    ) -> Result<Vec<(Txid, Transaction)>, BridgeError> {
        if move_txids.is_empty() {
            return Ok(Vec::new());
        }

        let mut query_builder = QueryBuilder::new(
            "SELECT move_txid, emergency_stop_tx FROM emergency_stop_sigs WHERE move_txid IN (",
        );

        let mut separated = query_builder.separated(", ");
        for txid in &move_txids {
            separated.push_bind(TxidDB(*txid));
        }
        query_builder.push(")");

        let query = query_builder.build_query_as::<(TxidDB, Vec<u8>)>();

        let results: Vec<(TxidDB, Vec<u8>)> =
            execute_query_with_tx!(self.connection, tx, query, fetch_all)?;

        Ok(results
            .into_iter()
            .map(|(txid, tx_data)| {
                let tx = consensus::deserialize(&tx_data)
                    .map_err(|e| eyre::eyre!("Failed to deserialize emergency stop tx: {e}"))?;
                Ok((txid.0, tx))
            })
            .collect::<Result<_, eyre::Report>>()?)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::{
        builder::transaction::{TransactionType, TxHandlerBuilder},
        test::common::*,
    };
    use bitcoin::{hashes::Hash, Transaction, Txid};

    fn create_test_transaction() -> Transaction {
        let tx_handler = TxHandlerBuilder::new(TransactionType::Dummy).finalize();
        tx_handler.get_cached_tx().clone()
    }

    #[tokio::test]
    async fn test_set_get_emergency_stop_tx() {
        let config = create_test_config_with_thread_name().await;
        let database = Database::new(&config).await.unwrap();

        let move_txid = Txid::from_byte_array([1u8; 32]);
        let emergency_stop_tx = create_test_transaction();
        database
            .set_signed_emergency_stop_tx(None, &move_txid, &emergency_stop_tx)
            .await
            .unwrap();

        let results = database
            .get_emergency_stop_txs(None, vec![move_txid])
            .await
            .unwrap();

        assert_eq!(results.len(), 1);
        assert_eq!(results[0].0, move_txid);
        assert_eq!(results[0].1, emergency_stop_tx);

        // Test getting non-existent tx
        let non_existent_txid = Txid::from_byte_array([2u8; 32]);
        let results = database
            .get_emergency_stop_txs(None, vec![non_existent_txid])
            .await
            .unwrap();
        assert!(results.is_empty());

        // Test getting multiple txs
        let move_txid2 = Txid::from_byte_array([3u8; 32]);
        let emergency_stop_tx2 = create_test_transaction();
        database
            .set_signed_emergency_stop_tx(None, &move_txid2, &emergency_stop_tx2)
            .await
            .unwrap();

        let results = database
            .get_emergency_stop_txs(None, vec![move_txid, move_txid2])
            .await
            .unwrap();

        assert_eq!(results.len(), 2);
        let mut results = results;
        results.sort_by(|a, b| a.0.cmp(&b.0));
        assert_eq!(results[0].0, move_txid);
        assert_eq!(results[0].1, emergency_stop_tx);
        assert_eq!(results[1].0, move_txid2);
        assert_eq!(results[1].1, emergency_stop_tx2);

        // Test updating existing tx
        let updated_tx = create_test_transaction();
        database
            .set_signed_emergency_stop_tx(None, &move_txid, &updated_tx)
            .await
            .unwrap();

        let results = database
            .get_emergency_stop_txs(None, vec![move_txid])
            .await
            .unwrap();

        assert_eq!(results.len(), 1);
        assert_eq!(results[0].0, move_txid);
        assert_eq!(results[0].1, updated_tx);
    }
}