1use crate::TxSenderDatabase;
7use crate::{ActivatedWithOutpoint, ActivatedWithTxid};
8use bitcoin::{OutPoint, Transaction, Txid};
9use clementine_config::protocol::ProtocolParamset;
10use clementine_errors::BridgeError;
11use clementine_primitives::{TransactionType, UtxoVout};
12use clementine_utils::{FeePayingType, RbfSigningInfo, TxMetadata};
13use eyre::eyre;
14use std::collections::BTreeMap;
15
16#[derive(Debug, Clone)]
17pub struct TxSenderClient<D>
18where
19 D: TxSenderDatabase,
20{
21 pub db: D,
22 pub tx_sender_consumer_id: String,
23}
24
25impl<D> TxSenderClient<D>
26where
27 D: TxSenderDatabase,
28{
29 pub fn new(db: D, tx_sender_consumer_id: String) -> Self {
30 Self {
31 db,
32 tx_sender_consumer_id,
33 }
34 }
35
36 #[tracing::instrument(err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE), skip_all, fields(?tx_metadata, consumer = self.tx_sender_consumer_id))]
66 #[allow(clippy::too_many_arguments)]
67 pub async fn insert_try_to_send(
68 &self,
69 mut dbtx: Option<&mut D::Transaction>,
70 tx_metadata: Option<TxMetadata>,
71 signed_tx: &Transaction,
72 fee_paying_type: FeePayingType,
73 rbf_signing_info: Option<RbfSigningInfo>,
74 cancel_outpoints: &[OutPoint],
75 cancel_txids: &[Txid],
76 activate_txids: &[ActivatedWithTxid],
77 activate_outpoints: &[ActivatedWithOutpoint],
78 ) -> Result<u32, BridgeError> {
79 let txid = signed_tx.compute_txid();
80
81 tracing::debug!(
82 "{} added tx {} with txid {} to the queue",
83 self.tx_sender_consumer_id,
84 tx_metadata
85 .as_ref()
86 .map(|data| format!("{:?}", data.tx_type))
87 .unwrap_or("N/A".to_string()),
88 txid
89 );
90
91 let tx_exists = self
93 .db
94 .check_if_tx_exists_on_txsender(dbtx.as_deref_mut(), txid)
95 .await?;
96 if let Some(try_to_send_id) = tx_exists {
97 return Ok(try_to_send_id);
98 }
99
100 let try_to_send_id = self
101 .db
102 .save_tx(
103 dbtx.as_deref_mut(),
104 tx_metadata,
105 signed_tx,
106 fee_paying_type,
107 txid,
108 rbf_signing_info,
109 )
110 .await?;
111
112 #[cfg(test)]
114 tracing::debug!(target: "ci", "Saved tx to database with try_to_send_id: {try_to_send_id}, metadata: {tx_metadata:?}, raw tx: {}", hex::encode(bitcoin::consensus::serialize(signed_tx)));
115
116 for input_outpoint in signed_tx.input.iter().map(|input| input.previous_output) {
117 self.db
118 .save_cancelled_outpoint(dbtx.as_deref_mut(), try_to_send_id, input_outpoint)
119 .await?;
120 }
121
122 for outpoint in cancel_outpoints {
123 self.db
124 .save_cancelled_outpoint(dbtx.as_deref_mut(), try_to_send_id, *outpoint)
125 .await?;
126 }
127
128 for txid in cancel_txids {
129 self.db
130 .save_cancelled_txid(dbtx.as_deref_mut(), try_to_send_id, *txid)
131 .await?;
132 }
133
134 let mut max_timelock_of_activated_txids = BTreeMap::new();
135
136 for activated_txid in activate_txids {
137 let timelock = max_timelock_of_activated_txids
138 .entry(activated_txid.txid)
139 .or_insert(activated_txid.relative_block_height);
140 if *timelock < activated_txid.relative_block_height {
141 *timelock = activated_txid.relative_block_height;
142 }
143 }
144
145 for input in signed_tx.input.iter() {
146 let relative_block_height = if input.sequence.is_relative_lock_time() {
147 let relative_locktime = input
148 .sequence
149 .to_relative_lock_time()
150 .expect("Invalid relative locktime");
151 match relative_locktime {
152 bitcoin::relative::LockTime::Blocks(height) => height.value() as u32,
153 _ => {
154 return Err(BridgeError::Eyre(eyre!("Invalid relative locktime")));
155 }
156 }
157 } else {
158 0
159 };
160 let timelock = max_timelock_of_activated_txids
161 .entry(input.previous_output.txid)
162 .or_insert(relative_block_height);
163 if *timelock < relative_block_height {
164 *timelock = relative_block_height;
165 }
166 }
167
168 for (txid, timelock) in max_timelock_of_activated_txids {
169 self.db
170 .save_activated_txid(
171 dbtx.as_deref_mut(),
172 try_to_send_id,
173 &ActivatedWithTxid {
174 txid,
175 relative_block_height: timelock,
176 },
177 )
178 .await?;
179 }
180
181 for activated_outpoint in activate_outpoints {
182 self.db
183 .save_activated_outpoint(dbtx.as_deref_mut(), try_to_send_id, activated_outpoint)
184 .await?;
185 }
186
187 Ok(try_to_send_id)
188 }
189
190 #[allow(clippy::too_many_arguments)]
214 pub async fn add_tx_to_queue(
215 &self,
216 dbtx: Option<&mut D::Transaction>,
217 tx_type: TransactionType,
218 signed_tx: &Transaction,
219 related_txs: &[(TransactionType, Transaction)],
220 tx_metadata: Option<TxMetadata>,
221 protocol_paramset: &ProtocolParamset,
222 rbf_info: Option<RbfSigningInfo>,
223 ) -> Result<u32, BridgeError> {
224 let tx_metadata = tx_metadata.map(|mut data| {
225 data.tx_type = tx_type;
226 data
227 });
228 match tx_type {
229 TransactionType::Kickoff
230 | TransactionType::Dummy
231 | TransactionType::ChallengeTimeout
232 | TransactionType::DisproveTimeout
233 | TransactionType::Reimburse
234 | TransactionType::Round
235 | TransactionType::OperatorChallengeNack(_)
236 | TransactionType::UnspentKickoff(_)
237 | TransactionType::MoveToVault
238 | TransactionType::BurnUnusedKickoffConnectors
239 | TransactionType::KickoffNotFinalized
240 | TransactionType::MiniAssert(_)
241 | TransactionType::LatestBlockhashTimeout
242 | TransactionType::LatestBlockhash
243 | TransactionType::EmergencyStop
244 | TransactionType::OptimisticPayout
245 | TransactionType::ReadyToReimburse
246 | TransactionType::ReplacementDeposit
247 | TransactionType::AssertTimeout(_) => {
248 self.insert_try_to_send(
250 dbtx,
251 tx_metadata,
252 signed_tx,
253 FeePayingType::CPFP,
254 rbf_info,
255 &[],
256 &[],
257 &[],
258 &[],
259 )
260 .await
261 }
262 TransactionType::Challenge
263 | TransactionType::WatchtowerChallenge(_)
264 | TransactionType::Payout => {
265 self.insert_try_to_send(
266 dbtx,
267 tx_metadata,
268 signed_tx,
269 FeePayingType::RBF,
270 rbf_info,
271 &[],
272 &[],
273 &[],
274 &[],
275 )
276 .await
277 }
278 TransactionType::WatchtowerChallengeTimeout(_) => {
279 let kickoff_txid = related_txs
283 .iter()
284 .find_map(|(tx_type, tx)| {
285 if let TransactionType::Kickoff = tx_type {
286 Some(tx.compute_txid())
287 } else {
288 None
289 }
290 })
291 .ok_or(BridgeError::Eyre(eyre!(
292 "Couldn't find kickoff tx in related_txs"
293 )))?;
294 self.insert_try_to_send(
295 dbtx,
296 tx_metadata,
297 signed_tx,
298 FeePayingType::CPFP,
299 rbf_info,
300 &[OutPoint {
301 txid: kickoff_txid,
302 vout: UtxoVout::KickoffFinalizer.get_vout(),
303 }],
304 &[],
305 &[],
306 &[],
307 )
308 .await
309 }
310 TransactionType::OperatorChallengeAck(watchtower_idx) => {
311 let kickoff_txid = related_txs
312 .iter()
313 .find_map(|(tx_type, tx)| {
314 if let TransactionType::Kickoff = tx_type {
315 Some(tx.compute_txid())
316 } else {
317 None
318 }
319 })
320 .ok_or(BridgeError::Eyre(eyre!(
321 "Couldn't find kickoff tx in related_txs"
322 )))?;
323 self.insert_try_to_send(
324 dbtx,
325 tx_metadata,
326 signed_tx,
327 FeePayingType::CPFP,
328 rbf_info,
329 &[],
330 &[],
331 &[],
332 &[ActivatedWithOutpoint {
333 outpoint: OutPoint {
335 txid: kickoff_txid,
336 vout: UtxoVout::WatchtowerChallenge(watchtower_idx).get_vout(),
337 },
338 relative_block_height: protocol_paramset.finality_depth - 1,
339 }],
340 )
341 .await
342 }
343 TransactionType::Disprove => {
344 self.insert_try_to_send(
345 dbtx,
346 tx_metadata,
347 signed_tx,
348 FeePayingType::NoFunding,
349 rbf_info,
350 &[],
351 &[],
352 &[],
353 &[],
354 )
355 .await
356 }
357 TransactionType::AllNeededForDeposit | TransactionType::YieldKickoffTxid => {
358 unreachable!("Higher level transaction types used for yielding kickoff txid from sighash stream should not be added to the queue");
359 }
360 }
361 }
362}