clementine_core/tx_sender/
client.rs1use super::Result;
7use super::{ActivatedWithOutpoint, ActivatedWithTxid};
8use crate::builder::transaction::input::UtxoVout;
9use crate::errors::ResultExt;
10use crate::operator::RoundIndex;
11use crate::rpc;
12use crate::utils::{FeePayingType, RbfSigningInfo, TxMetadata};
13use crate::{
14 builder::transaction::TransactionType,
15 config::BridgeConfig,
16 database::{Database, DatabaseTransaction},
17};
18use bitcoin::hashes::Hash;
19use bitcoin::{OutPoint, Transaction, Txid};
20use std::collections::BTreeMap;
21
22#[derive(Debug, Clone)]
23pub struct TxSenderClient {
24 pub(super) db: Database,
25 pub(super) tx_sender_consumer_id: String,
26}
27
28impl TxSenderClient {
29 pub fn new(db: Database, tx_sender_consumer_id: String) -> Self {
30 Self {
31 db,
32 tx_sender_consumer_id,
33 }
34 }
35
36 #[tracing::instrument(err(level = tracing::Level::ERROR), ret(level = tracing::Level::TRACE), skip_all, fields(?tx_metadata, consumer = self.tx_sender_consumer_id))]
66 #[allow(clippy::too_many_arguments)]
67 pub async fn insert_try_to_send(
68 &self,
69 dbtx: DatabaseTransaction<'_, '_>,
70 tx_metadata: Option<TxMetadata>,
71 signed_tx: &Transaction,
72 fee_paying_type: FeePayingType,
73 rbf_signing_info: Option<RbfSigningInfo>,
74 cancel_outpoints: &[OutPoint],
75 cancel_txids: &[Txid],
76 activate_txids: &[ActivatedWithTxid],
77 activate_outpoints: &[ActivatedWithOutpoint],
78 ) -> Result<u32> {
79 let txid = signed_tx.compute_txid();
80
81 tracing::debug!(
82 "{} added tx {} with txid {} to the queue",
83 self.tx_sender_consumer_id,
84 tx_metadata
85 .map(|data| format!("{:?}", data.tx_type))
86 .unwrap_or("N/A".to_string()),
87 txid
88 );
89
90 let tx_exists = self
92 .db
93 .check_if_tx_exists_on_txsender(Some(dbtx), txid)
94 .await
95 .map_to_eyre()?;
96 if let Some(try_to_send_id) = tx_exists {
97 return Ok(try_to_send_id);
98 }
99
100 let try_to_send_id = self
101 .db
102 .save_tx(
103 Some(dbtx),
104 tx_metadata,
105 signed_tx,
106 fee_paying_type,
107 txid,
108 rbf_signing_info,
109 )
110 .await
111 .map_to_eyre()?;
112
113 for input_outpoint in signed_tx.input.iter().map(|input| input.previous_output) {
114 self.db
115 .save_cancelled_outpoint(Some(dbtx), try_to_send_id, input_outpoint)
116 .await
117 .map_to_eyre()?;
118 }
119
120 for outpoint in cancel_outpoints {
121 self.db
122 .save_cancelled_outpoint(Some(dbtx), try_to_send_id, *outpoint)
123 .await
124 .map_to_eyre()?;
125 }
126
127 for txid in cancel_txids {
128 self.db
129 .save_cancelled_txid(Some(dbtx), try_to_send_id, *txid)
130 .await
131 .map_to_eyre()?;
132 }
133
134 let mut max_timelock_of_activated_txids = BTreeMap::new();
135
136 for activated_txid in activate_txids {
137 let timelock = max_timelock_of_activated_txids
138 .entry(activated_txid.txid)
139 .or_insert(activated_txid.relative_block_height);
140 if *timelock < activated_txid.relative_block_height {
141 *timelock = activated_txid.relative_block_height;
142 }
143 }
144
145 for input in signed_tx.input.iter() {
146 let relative_block_height = if input.sequence.is_relative_lock_time() {
147 let relative_locktime = input
148 .sequence
149 .to_relative_lock_time()
150 .expect("Invalid relative locktime");
151 match relative_locktime {
152 bitcoin::relative::LockTime::Blocks(height) => height.value() as u32,
153 _ => {
154 return Err(eyre::eyre!("Invalid relative locktime").into());
155 }
156 }
157 } else {
158 0
159 };
160 let timelock = max_timelock_of_activated_txids
161 .entry(input.previous_output.txid)
162 .or_insert(relative_block_height);
163 if *timelock < relative_block_height {
164 *timelock = relative_block_height;
165 }
166 }
167
168 for (txid, timelock) in max_timelock_of_activated_txids {
169 self.db
170 .save_activated_txid(
171 Some(dbtx),
172 try_to_send_id,
173 &ActivatedWithTxid {
174 txid,
175 relative_block_height: timelock,
176 },
177 )
178 .await
179 .map_to_eyre()?;
180 }
181
182 for activated_outpoint in activate_outpoints {
183 self.db
184 .save_activated_outpoint(Some(dbtx), try_to_send_id, activated_outpoint)
185 .await
186 .map_to_eyre()?;
187 }
188
189 Ok(try_to_send_id)
190 }
191
192 #[allow(clippy::too_many_arguments)]
216 pub async fn add_tx_to_queue<'a>(
217 &'a self,
218 dbtx: DatabaseTransaction<'a, '_>,
219 tx_type: TransactionType,
220 signed_tx: &Transaction,
221 related_txs: &[(TransactionType, Transaction)],
222 tx_metadata: Option<TxMetadata>,
223 config: &BridgeConfig,
224 rbf_info: Option<RbfSigningInfo>,
225 ) -> Result<u32> {
226 let tx_metadata = tx_metadata.map(|mut data| {
227 data.tx_type = tx_type;
228 data
229 });
230 match tx_type {
231 TransactionType::Kickoff
232 | TransactionType::Dummy
233 | TransactionType::ChallengeTimeout
234 | TransactionType::DisproveTimeout
235 | TransactionType::Reimburse
236 | TransactionType::Round
237 | TransactionType::OperatorChallengeNack(_)
238 | TransactionType::UnspentKickoff(_)
239 | TransactionType::MoveToVault
240 | TransactionType::BurnUnusedKickoffConnectors
241 | TransactionType::KickoffNotFinalized
242 | TransactionType::MiniAssert(_)
243 | TransactionType::LatestBlockhashTimeout
244 | TransactionType::LatestBlockhash
245 | TransactionType::EmergencyStop
246 | TransactionType::OptimisticPayout
247 | TransactionType::ReadyToReimburse
248 | TransactionType::ReplacementDeposit
249 | TransactionType::AssertTimeout(_) => {
250 self.insert_try_to_send(
252 dbtx,
253 tx_metadata,
254 signed_tx,
255 FeePayingType::CPFP,
256 rbf_info,
257 &[],
258 &[],
259 &[],
260 &[],
261 )
262 .await
263 }
264 TransactionType::Challenge
265 | TransactionType::WatchtowerChallenge(_)
266 | TransactionType::Payout => {
267 self.insert_try_to_send(
268 dbtx,
269 tx_metadata,
270 signed_tx,
271 FeePayingType::RBF,
272 rbf_info,
273 &[],
274 &[],
275 &[],
276 &[],
277 )
278 .await
279 }
280 TransactionType::WatchtowerChallengeTimeout(_) => {
281 let kickoff_txid = related_txs
285 .iter()
286 .find_map(|(tx_type, tx)| {
287 if let TransactionType::Kickoff = tx_type {
288 Some(tx.compute_txid())
289 } else {
290 None
291 }
292 })
293 .ok_or(eyre::eyre!("Couldn't find kickoff tx in related_txs"))?;
294 self.insert_try_to_send(
295 dbtx,
296 tx_metadata,
297 signed_tx,
298 FeePayingType::CPFP,
299 rbf_info,
300 &[OutPoint {
301 txid: kickoff_txid,
302 vout: UtxoVout::KickoffFinalizer.get_vout(),
303 }],
304 &[],
305 &[],
306 &[],
307 )
308 .await
309 }
310 TransactionType::OperatorChallengeAck(watchtower_idx) => {
311 let kickoff_txid = related_txs
312 .iter()
313 .find_map(|(tx_type, tx)| {
314 if let TransactionType::Kickoff = tx_type {
315 Some(tx.compute_txid())
316 } else {
317 None
318 }
319 })
320 .ok_or(eyre::eyre!("Couldn't find kickoff tx in related_txs"))?;
321 self.insert_try_to_send(
322 dbtx,
323 tx_metadata,
324 signed_tx,
325 FeePayingType::CPFP,
326 rbf_info,
327 &[],
328 &[],
329 &[],
330 &[ActivatedWithOutpoint {
331 outpoint: OutPoint {
333 txid: kickoff_txid,
334 vout: UtxoVout::WatchtowerChallenge(watchtower_idx).get_vout(),
335 },
336 relative_block_height: config.protocol_paramset().finality_depth - 1,
337 }],
338 )
339 .await
340 }
341 TransactionType::Disprove => {
342 self.insert_try_to_send(
343 dbtx,
344 tx_metadata,
345 signed_tx,
346 FeePayingType::NoFunding,
347 rbf_info,
348 &[],
349 &[],
350 &[],
351 &[],
352 )
353 .await
354 }
355 TransactionType::AllNeededForDeposit | TransactionType::YieldKickoffTxid => {
356 unreachable!()
357 }
358 }
359 }
360
361 pub async fn debug_tx(&self, id: u32) -> Result<crate::rpc::clementine::TxDebugInfo> {
372 use crate::rpc::clementine::{TxDebugFeePayerUtxo, TxDebugInfo, TxDebugSubmissionError};
373
374 let (tx_metadata, tx, fee_paying_type, seen_block_id, _) =
375 self.db.get_try_to_send_tx(None, id).await.map_to_eyre()?;
376
377 let submission_errors = self
378 .db
379 .get_tx_debug_submission_errors(None, id)
380 .await
381 .map_to_eyre()?;
382
383 let submission_errors = submission_errors
384 .into_iter()
385 .map(|(error_message, timestamp)| TxDebugSubmissionError {
386 error_message,
387 timestamp,
388 })
389 .collect();
390
391 let current_state = self.db.get_tx_debug_info(None, id).await.map_to_eyre()?;
392
393 let fee_payer_utxos = self
394 .db
395 .get_tx_debug_fee_payer_utxos(None, id)
396 .await
397 .map_to_eyre()?;
398
399 let fee_payer_utxos = fee_payer_utxos
400 .into_iter()
401 .map(|(txid, vout, amount, confirmed)| {
402 Ok(TxDebugFeePayerUtxo {
403 txid: Some(txid.into()),
404 vout,
405 amount: amount.to_sat(),
406 confirmed,
407 })
408 })
409 .collect::<Result<Vec<_>>>()?;
410
411 let txid = match fee_paying_type {
412 FeePayingType::CPFP | FeePayingType::NoFunding => tx.compute_txid(),
413 FeePayingType::RBF => self
414 .db
415 .get_last_rbf_txid(None, id)
416 .await
417 .map_to_eyre()?
418 .unwrap_or(Txid::all_zeros()),
419 };
420 let debug_info = TxDebugInfo {
421 id,
422 is_active: seen_block_id.is_none(),
423 current_state: current_state.unwrap_or_else(|| "unknown".to_string()),
424 submission_errors,
425 created_at: "".to_string(),
426 txid: Some(txid.into()),
427 fee_paying_type: format!("{fee_paying_type:?}"),
428 fee_payer_utxos_count: fee_payer_utxos.len() as u32,
429 fee_payer_utxos_confirmed_count: fee_payer_utxos
430 .iter()
431 .filter(|TxDebugFeePayerUtxo { confirmed, .. }| *confirmed)
432 .count() as u32,
433 fee_payer_utxos,
434 raw_tx: bitcoin::consensus::serialize(&tx),
435 metadata: tx_metadata.map(|metadata| rpc::clementine::TxMetadata {
436 deposit_outpoint: metadata.deposit_outpoint.map(Into::into),
437 operator_xonly_pk: metadata.operator_xonly_pk.map(Into::into),
438
439 round_idx: metadata
440 .round_idx
441 .unwrap_or(RoundIndex::Round(0))
442 .to_index() as u32,
443 kickoff_idx: metadata.kickoff_idx.unwrap_or(0),
444 tx_type: Some(metadata.tx_type.into()),
445 }),
446 };
447
448 Ok(debug_info)
449 }
450}