1use crate::config::BridgeConfig;
19use crate::errors::ResultExt;
20use crate::utils::FeePayingType;
21use crate::{
22 actor::Actor,
23 builder::{self},
24 database::Database,
25 extended_bitcoin_rpc::ExtendedBitcoinRpc,
26 utils::TxMetadata,
27};
28use bitcoin::taproot::TaprootSpendInfo;
29use bitcoin::{Amount, FeeRate, OutPoint, Transaction, TxOut, Txid, Weight};
30use bitcoincore_rpc::RpcApi;
31use eyre::OptionExt;
32
33#[cfg(test)]
34use std::env;
35
36mod client;
37mod cpfp;
38mod nonstandard;
39mod rbf;
40mod task;
41
42pub use client::TxSenderClient;
43pub use task::TxSenderTask;
44
45macro_rules! log_error_for_tx {
47 ($db:expr, $try_to_send_id:expr, $err:expr) => {{
48 let db = $db.clone();
49 let try_to_send_id = $try_to_send_id;
50 let err = $err.to_string();
51 tracing::warn!(try_to_send_id, "{}", err);
52 tokio::spawn(async move {
53 let _ = db
54 .save_tx_debug_submission_error(try_to_send_id, &err)
55 .await;
56 });
57 }};
58}
59
60use log_error_for_tx;
62
63#[derive(Clone, Debug)]
71pub struct TxSender {
72 pub signer: Actor,
73 pub rpc: ExtendedBitcoinRpc,
74 pub db: Database,
75 pub btc_syncer_consumer_id: String,
76 pub config: BridgeConfig,
77 cached_spendinfo: TaprootSpendInfo,
78 http_client: reqwest::Client,
79}
80
81#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
82pub struct ActivatedWithTxid {
83 pub txid: Txid,
84 pub relative_block_height: u32,
85}
86
87#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)]
88pub struct ActivatedWithOutpoint {
89 pub outpoint: OutPoint,
90 pub relative_block_height: u32,
91}
92
93#[derive(Debug, thiserror::Error)]
94pub enum SendTxError {
95 #[error("Unconfirmed fee payer UTXOs left")]
96 UnconfirmedFeePayerUTXOsLeft,
97 #[error("Insufficient fee payer amount")]
98 InsufficientFeePayerAmount,
99
100 #[error("Failed to create a PSBT for fee bump")]
101 PsbtError(String),
102
103 #[error("Network error: {0}")]
104 NetworkError(String),
105
106 #[error(transparent)]
107 Other(#[from] eyre::Report),
108}
109
110type Result<T> = std::result::Result<T, SendTxError>;
111
112impl TxSender {
113 pub fn new(
114 signer: Actor,
115 rpc: ExtendedBitcoinRpc,
116 db: Database,
117 btc_syncer_consumer_id: String,
118 config: BridgeConfig,
119 ) -> Self {
120 Self {
121 cached_spendinfo: builder::address::create_taproot_address(
122 &[],
123 Some(signer.xonly_public_key),
124 config.protocol_paramset.network,
125 )
126 .1,
127 signer,
128 rpc,
129 db,
130 btc_syncer_consumer_id,
131 config: config.clone(),
132 http_client: reqwest::Client::new(),
133 }
134 }
135
136 async fn get_fee_rate(&self) -> Result<FeeRate> {
146 self.rpc
147 .get_fee_rate(
148 self.config.protocol_paramset.network,
149 &self.config.mempool_api_host,
150 &self.config.mempool_api_endpoint,
151 self.config.tx_sender_limits.mempool_fee_rate_multiplier,
152 self.config.tx_sender_limits.mempool_fee_rate_offset_sat_kvb,
153 self.config.tx_sender_limits.fee_rate_hard_cap,
154 )
155 .await
156 .map_err(|e| SendTxError::Other(e.into()))
157 }
158
159 fn calculate_required_fee(
178 parent_tx_weight: Weight,
179 num_fee_payer_utxos: usize,
180 fee_rate: FeeRate,
181 fee_paying_type: FeePayingType,
182 ) -> Result<Amount> {
183 tracing::info!(
184 "Calculating required fee for {} fee payer utxos",
185 num_fee_payer_utxos
186 );
187 let child_tx_weight = match fee_paying_type {
194 FeePayingType::CPFP => Weight::from_wu_usize(230 * num_fee_payer_utxos + 207 + 172),
198 FeePayingType::RBF => Weight::from_wu_usize(230 * num_fee_payer_utxos + 172),
202 FeePayingType::NoFunding => Weight::from_wu_usize(0),
203 };
204
205 let total_weight = match fee_paying_type {
209 FeePayingType::CPFP => Weight::from_vb_unchecked(
210 child_tx_weight.to_vbytes_ceil() + parent_tx_weight.to_vbytes_ceil(),
211 ),
212 FeePayingType::RBF => child_tx_weight + parent_tx_weight, FeePayingType::NoFunding => parent_tx_weight,
214 };
215
216 fee_rate
217 .checked_mul_by_weight(total_weight)
218 .ok_or_eyre("Fee calculation overflow")
219 .map_err(Into::into)
220 }
221
222 fn is_p2a_anchor(&self, output: &TxOut) -> bool {
223 output.script_pubkey
224 == builder::transaction::anchor_output(self.config.protocol_paramset.anchor_amount())
225 .script_pubkey
226 }
227
228 fn find_p2a_vout(&self, tx: &Transaction) -> Result<usize> {
229 let p2a_anchor = tx
230 .output
231 .iter()
232 .enumerate()
233 .find(|(_, output)| self.is_p2a_anchor(output));
234 if let Some((vout, _)) = p2a_anchor {
235 Ok(vout)
236 } else {
237 Err(eyre::eyre!("P2A anchor output not found in transaction").into())
238 }
239 }
240
241 #[allow(dead_code)]
244 fn btc_per_kvb_to_fee_rate(btc_per_kvb: f64) -> FeeRate {
245 FeeRate::from_sat_per_vb_unchecked((btc_per_kvb * 100000.0) as u64)
246 }
247
248 #[tracing::instrument(skip_all, fields(sender = self.btc_syncer_consumer_id, new_fee_rate, current_tip_height))]
268 async fn try_to_send_unconfirmed_txs(
269 &self,
270 new_fee_rate: FeeRate,
271 current_tip_height: u32,
272 is_tip_height_increased: bool,
273 ) -> Result<()> {
274 let get_sendable_txs_fee_rate = if is_tip_height_increased {
280 FeeRate::from_sat_per_kwu(u32::MAX as u64)
281 } else {
282 new_fee_rate
283 };
284 let txs = self
285 .db
286 .get_sendable_txs(None, get_sendable_txs_fee_rate, current_tip_height)
287 .await
288 .map_to_eyre()?;
289
290 self.bump_fees_of_unconfirmed_fee_payer_txs(new_fee_rate)
292 .await?;
293
294 if !txs.is_empty() {
295 tracing::debug!("Trying to send {} sendable txs ", txs.len());
296 }
297
298 #[cfg(test)]
299 {
300 if env::var("TXSENDER_DBG_INACTIVE_TXS").is_ok() {
301 self.db
302 .debug_inactive_txs(get_sendable_txs_fee_rate, current_tip_height)
303 .await;
304 }
305 }
306
307 for id in txs {
308 tracing::debug!(
310 try_to_send_id = id,
311 "Processing TX in try_to_send_unconfirmed_txs with fee rate {new_fee_rate}",
312 );
313
314 let (tx_metadata, tx, fee_paying_type, seen_block_id, rbf_signing_info) =
315 match self.db.get_try_to_send_tx(None, id).await {
316 Ok(res) => res,
317 Err(e) => {
318 log_error_for_tx!(self.db, id, format!("Failed to get tx details: {}", e));
319 continue;
320 }
321 };
322
323 if let Some(block_id) = seen_block_id {
325 tracing::debug!(
326 try_to_send_id = id,
327 "Transaction already confirmed in block with block id of {}",
328 block_id
329 );
330
331 let _ = self
333 .db
334 .update_tx_debug_sending_state(id, "confirmed", true)
335 .await;
336
337 continue;
338 }
339
340 let result = match fee_paying_type {
341 _ if self.config.protocol_paramset.network == bitcoin::Network::Testnet4
344 && self.is_bridge_tx_nonstandard(&tx) =>
345 {
346 self.send_testnet4_nonstandard_tx(&tx, id).await
347 }
348 FeePayingType::CPFP => self.send_cpfp_tx(id, tx, tx_metadata, new_fee_rate).await,
349 FeePayingType::RBF => {
350 self.send_rbf_tx(id, tx, tx_metadata, new_fee_rate, rbf_signing_info)
351 .await
352 }
353 FeePayingType::NoFunding => self.send_no_funding_tx(id, tx, tx_metadata).await,
354 };
355
356 if let Err(e) = result {
357 log_error_for_tx!(self.db, id, format!("Failed to send tx: {:?}", e));
358 }
359 }
360
361 Ok(())
362 }
363
364 pub fn client(&self) -> TxSenderClient {
365 TxSenderClient::new(self.db.clone(), self.btc_syncer_consumer_id.clone())
366 }
367
368 #[tracing::instrument(skip_all, fields(sender = self.btc_syncer_consumer_id, try_to_send_id, tx_meta=?tx_metadata))]
388 pub(super) async fn send_no_funding_tx(
389 &self,
390 try_to_send_id: u32,
391 tx: Transaction,
392 tx_metadata: Option<TxMetadata>,
393 ) -> Result<()> {
394 tracing::debug!(target: "ci", "Sending no funding tx, raw tx: {:?}", hex::encode(bitcoin::consensus::serialize(&tx)));
395 match self.rpc.send_raw_transaction(&tx).await {
396 Ok(sent_txid) => {
397 tracing::debug!(
398 try_to_send_id,
399 "Successfully sent no funding tx with txid {}",
400 sent_txid
401 );
402 let _ = self
403 .db
404 .update_tx_debug_sending_state(try_to_send_id, "no_funding_send_success", true)
405 .await;
406 }
407 Err(e) => {
408 tracing::error!(
409 "Failed to send no funding tx with try_to_send_id: {try_to_send_id:?} and metadata: {tx_metadata:?}"
410 );
411 let err_msg = format!("send_raw_transaction error for no funding tx: {e}");
412 log_error_for_tx!(self.db, try_to_send_id, err_msg);
413 let _ = self
414 .db
415 .update_tx_debug_sending_state(try_to_send_id, "no_funding_send_failed", true)
416 .await;
417 return Err(SendTxError::Other(eyre::eyre!(e)));
418 }
419 };
420
421 Ok(())
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use crate::actor::TweakCache;
429 use crate::bitcoin_syncer::BitcoinSyncer;
430 use crate::bitvm_client::SECP;
431 use crate::builder::script::{CheckSig, SpendPath, SpendableScript};
432 use crate::builder::transaction::input::SpendableTxIn;
433 use crate::builder::transaction::output::UnspentTxOut;
434 use crate::builder::transaction::{TransactionType, TxHandlerBuilder, DEFAULT_SEQUENCE};
435 use crate::config::protocol::ProtocolParamset;
436 use crate::errors::BridgeError;
437 use crate::rpc::clementine::NormalSignatureKind;
438 use crate::task::{IntoTask, TaskExt};
439 use crate::test::common::tx_utils::{create_bg_tx_sender, create_bumpable_tx};
440 use crate::{database::Database, test::common::*};
441 use bitcoin::hashes::Hash;
442 use bitcoin::secp256k1::SecretKey;
443 use serde_json::json;
444 use std::ops::Mul;
445 use std::result::Result;
446 use std::sync::atomic::{AtomicUsize, Ordering};
447 use std::sync::Arc;
448 use std::time::Duration;
449 use wiremock::matchers::{body_partial_json, method, path};
450 use wiremock::{Mock, MockServer, Request, Respond, ResponseTemplate};
451
452 pub(super) async fn create_tx_sender(
453 rpc: ExtendedBitcoinRpc,
454 ) -> (
455 TxSender,
456 BitcoinSyncer,
457 ExtendedBitcoinRpc,
458 Database,
459 Actor,
460 bitcoin::Network,
461 ) {
462 let sk = SecretKey::new(&mut rand::thread_rng());
463 let network = bitcoin::Network::Regtest;
464 let actor = Actor::new(sk, network);
465
466 let config = create_test_config_with_thread_name().await;
467
468 let db = Database::new(&config).await.unwrap();
469
470 let tx_sender = TxSender::new(
471 actor.clone(),
472 rpc.clone(),
473 db.clone(),
474 "tx_sender".into(),
475 config.clone(),
476 );
477
478 (
479 tx_sender,
480 BitcoinSyncer::new(db.clone(), rpc.clone(), config.protocol_paramset)
481 .await
482 .unwrap(),
483 rpc,
484 db,
485 actor,
486 network,
487 )
488 }
489
490 impl TxSenderClient {
491 pub async fn test_dbtx(
492 &self,
493 ) -> Result<sqlx::Transaction<'_, sqlx::Postgres>, BridgeError> {
494 self.db.begin_transaction().await
495 }
496 }
497
498 #[tokio::test]
499 async fn test_try_to_send_duplicate() -> Result<(), BridgeError> {
500 let mut config = create_test_config_with_thread_name().await;
501 let regtest = create_regtest_rpc(&mut config).await;
502 let rpc = regtest.rpc().clone();
503
504 rpc.mine_blocks(1).await.unwrap();
505
506 let (client, _tx_sender, _cancel_txs, rpc, db, signer, network) =
507 create_bg_tx_sender(config).await;
508
509 let tx = create_bumpable_tx(&rpc, &signer, network, FeePayingType::CPFP, false)
510 .await
511 .unwrap();
512
513 let mut dbtx = db.begin_transaction().await.unwrap();
514 let tx_id1 = client
515 .insert_try_to_send(
516 &mut dbtx,
517 None,
518 &tx,
519 FeePayingType::CPFP,
520 None,
521 &[],
522 &[],
523 &[],
524 &[],
525 )
526 .await
527 .unwrap();
528 let tx_id2 = client
529 .insert_try_to_send(
530 &mut dbtx,
531 None,
532 &tx,
533 FeePayingType::CPFP,
534 None,
535 &[],
536 &[],
537 &[],
538 &[],
539 )
540 .await
541 .unwrap(); dbtx.commit().await.unwrap();
543
544 poll_until_condition(
545 async || {
546 rpc.mine_blocks(1).await.unwrap();
547
548 match rpc.get_raw_transaction_info(&tx.compute_txid(), None).await {
549 Ok(tx_result) => {
550 if let Some(conf) = tx_result.confirmations {
551 return Ok(conf > 0);
552 }
553 Ok(false)
554 }
555 Err(_) => Ok(false),
556 }
557 },
558 Some(Duration::from_secs(30)),
559 Some(Duration::from_millis(100)),
560 )
561 .await
562 .expect("Tx was not confirmed in time");
563
564 poll_until_condition(
565 async || {
566 let (_, _, _, tx_id1_seen_block_id, _) =
567 db.get_try_to_send_tx(None, tx_id1).await.unwrap();
568 let (_, _, _, tx_id2_seen_block_id, _) =
569 db.get_try_to_send_tx(None, tx_id2).await.unwrap();
570
571 Ok(tx_id2_seen_block_id.is_some() && tx_id1_seen_block_id.is_some())
573 },
574 Some(Duration::from_secs(5)),
575 Some(Duration::from_millis(100)),
576 )
577 .await
578 .expect("Tx was not confirmed in time");
579
580 Ok(())
581 }
582
583 #[tokio::test]
584 async fn get_fee_rate() {
585 let mut config = create_test_config_with_thread_name().await;
586 let regtest = create_regtest_rpc(&mut config).await;
587 let rpc = regtest.rpc().clone();
588 let db = Database::new(&config).await.unwrap();
589
590 let amount = Amount::from_sat(100_000);
591 let signer = Actor::new(config.secret_key, config.protocol_paramset().network);
592 let (xonly_pk, _) = config.secret_key.public_key(&SECP).x_only_public_key();
593
594 let tx_sender = TxSender::new(
595 signer.clone(),
596 rpc.clone(),
597 db,
598 "tx_sender".into(),
599 config.clone(),
600 );
601
602 let scripts: Vec<Arc<dyn SpendableScript>> =
603 vec![Arc::new(CheckSig::new(xonly_pk)).clone()];
604 let (taproot_address, taproot_spend_info) = builder::address::create_taproot_address(
605 &scripts
606 .iter()
607 .map(|s| s.to_script_buf())
608 .collect::<Vec<_>>(),
609 None,
610 config.protocol_paramset().network,
611 );
612
613 let input_utxo = rpc.send_to_address(&taproot_address, amount).await.unwrap();
614
615 let builder = TxHandlerBuilder::new(TransactionType::Dummy).add_input(
616 NormalSignatureKind::NotStored,
617 SpendableTxIn::new(
618 input_utxo,
619 TxOut {
620 value: amount,
621 script_pubkey: taproot_address.script_pubkey(),
622 },
623 scripts.clone(),
624 Some(taproot_spend_info.clone()),
625 ),
626 SpendPath::ScriptSpend(0),
627 DEFAULT_SEQUENCE,
628 );
629
630 let mut will_fail_handler = builder
631 .clone()
632 .add_output(UnspentTxOut::new(
633 TxOut {
634 value: amount,
635 script_pubkey: taproot_address.script_pubkey(),
636 },
637 scripts.clone(),
638 Some(taproot_spend_info.clone()),
639 ))
640 .finalize();
641
642 let mut tweak_cache = TweakCache::default();
643 signer
644 .tx_sign_and_fill_sigs(&mut will_fail_handler, &[], Some(&mut tweak_cache))
645 .unwrap();
646
647 rpc.mine_blocks(1).await.unwrap();
648 let mempool_info = rpc.get_mempool_info().await.unwrap();
649 tracing::info!("Mempool info: {:?}", mempool_info);
650
651 let will_fail_tx = will_fail_handler.get_cached_tx();
652
653 if mempool_info.mempool_min_fee.to_sat() > 0 {
654 assert!(rpc.send_raw_transaction(will_fail_tx).await.is_err());
655 }
656
657 let fee_rate = tx_sender.get_fee_rate().await.unwrap();
659 let fee = TxSender::calculate_required_fee(
660 will_fail_tx.weight(),
661 1,
662 fee_rate,
663 FeePayingType::CPFP,
664 )
665 .unwrap();
666 tracing::info!("Fee rate: {:?}, fee: {}", fee_rate, fee);
667
668 let mut will_successful_handler = builder
669 .add_output(UnspentTxOut::new(
670 TxOut {
671 value: amount - fee,
672 script_pubkey: taproot_address.script_pubkey(),
673 },
674 scripts,
675 Some(taproot_spend_info),
676 ))
677 .finalize();
678 signer
679 .tx_sign_and_fill_sigs(&mut will_successful_handler, &[], Some(&mut tweak_cache))
680 .unwrap();
681
682 rpc.mine_blocks(1).await.unwrap();
683
684 rpc.send_raw_transaction(will_successful_handler.get_cached_tx())
685 .await
686 .unwrap();
687 }
688
689 #[tokio::test]
690 async fn test_send_no_funding_tx() -> Result<(), BridgeError> {
691 let mut config = create_test_config_with_thread_name().await;
693 let rpc = create_regtest_rpc(&mut config).await;
694
695 let (tx_sender, btc_sender, rpc, db, signer, network) =
696 create_tx_sender(rpc.rpc().clone()).await;
697 let pair = btc_sender.into_task().cancelable_loop();
698 pair.0.into_bg();
699
700 let tx = rbf::tests::create_rbf_tx(&rpc, &signer, network, false).await?;
702
703 let mut dbtx = db.begin_transaction().await?;
705 let try_to_send_id = tx_sender
706 .client()
707 .insert_try_to_send(
708 &mut dbtx,
709 None, &tx,
711 FeePayingType::NoFunding,
712 None,
713 &[], &[], &[], &[], )
718 .await?;
719 dbtx.commit().await?;
720
721 tx_sender
723 .send_no_funding_tx(try_to_send_id, tx.clone(), None)
724 .await
725 .expect("Already funded should succeed");
726
727 tx_sender
728 .send_no_funding_tx(try_to_send_id, tx.clone(), None)
729 .await
730 .expect("Should not return error if sent again");
731
732 let tx_debug_info = tx_sender
734 .client()
735 .debug_tx(try_to_send_id)
736 .await
737 .expect("Transaction should be have debug info");
738
739 rpc.get_tx_of_txid(&bitcoin::Txid::from_byte_array(
741 tx_debug_info.txid.unwrap().txid.try_into().unwrap(),
742 ))
743 .await
744 .expect("Transaction should be in mempool");
745
746 tx_sender
747 .send_no_funding_tx(try_to_send_id, tx.clone(), None)
748 .await
749 .expect("Should not return error if sent again but still in mempool");
750
751 Ok(())
752 }
753
754 #[tokio::test]
755 async fn test_get_fee_rate_mempool_higher_than_rpc_uses_rpc() {
756 let mock_rpc_server = MockServer::start().await;
757
758 Mock::given(method("POST"))
759 .and(path("/"))
760 .and(body_partial_json(json!({
761 "method": "estimatesmartfee"
762 })))
763 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
764 "jsonrpc": "2.0",
765 "id": 1,
766 "result": {
767 "feerate": 0.00002,
768 "blocks": 1
769 }
770 })))
771 .mount(&mock_rpc_server)
772 .await;
773
774 Mock::given(method("POST"))
775 .and(path("/"))
776 .and(body_partial_json(json!({
777 "method": "ping"
778 })))
779 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
780 "jsonrpc": "2.0",
781 "id": 1,
782 "result": null
783 })))
784 .mount(&mock_rpc_server)
785 .await;
786
787 let mock_mempool_server = MockServer::start().await;
788 Mock::given(method("GET"))
789 .and(path("/api/v1/fees/recommended"))
790 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
791 "fastestFee": 3,
792 "halfHourFee": 2,
793 "hourFee": 1
794 })))
795 .mount(&mock_mempool_server)
796 .await;
797
798 let mock_rpc = ExtendedBitcoinRpc::connect(
799 mock_rpc_server.uri(),
800 secrecy::SecretString::new("test_user".into()),
801 secrecy::SecretString::new("test_password".into()),
802 None,
803 )
804 .await
805 .unwrap();
806
807 let mut config = create_test_config_with_thread_name().await;
808 let network = bitcoin::Network::Bitcoin;
809 let paramset = ProtocolParamset {
810 network,
811 ..ProtocolParamset::default()
812 };
813
814 let mempool_space_uri = mock_mempool_server.uri() + "/";
815
816 config.protocol_paramset = Box::leak(Box::new(paramset));
817 config.mempool_api_host = Some(mempool_space_uri);
818 config.mempool_api_endpoint = Some("api/v1/fees/recommended".into());
819
820 let db = Database::new(&config).await.unwrap();
821 let signer = Actor::new(config.secret_key, config.protocol_paramset.network);
822
823 let tx_sender = TxSender::new(signer, mock_rpc, db, "test_tx_sender".into(), config);
824
825 let fee_rate = tx_sender.get_fee_rate().await.unwrap();
826 assert_eq!(fee_rate, FeeRate::from_sat_per_kwu(500));
827 }
828
829 #[tokio::test]
830 async fn test_get_fee_rate_rpc_higher_than_mempool() {
831 let mock_rpc_server = MockServer::start().await;
832
833 Mock::given(method("POST"))
834 .and(path("/"))
835 .and(body_partial_json(json!({
836 "method": "estimatesmartfee"
837 })))
838 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
839 "jsonrpc": "2.0",
840 "id": 1,
841 "result": {
842 "feerate": 0.00005,
843 "blocks": 1
844 }
845 })))
846 .mount(&mock_rpc_server)
847 .await;
848
849 Mock::given(method("POST"))
850 .and(path("/"))
851 .and(body_partial_json(json!({
852 "method": "ping"
853 })))
854 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
855 "jsonrpc": "2.0",
856 "id": 1,
857 "result": null
858 })))
859 .mount(&mock_rpc_server)
860 .await;
861
862 let mock_mempool_server = MockServer::start().await;
863 Mock::given(method("GET"))
864 .and(path("/api/v1/fees/recommended"))
865 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
866 "fastestFee": 4,
867 "halfHourFee": 3,
868 "hourFee": 2
869 })))
870 .mount(&mock_mempool_server)
871 .await;
872
873 let mock_rpc = ExtendedBitcoinRpc::connect(
874 mock_rpc_server.uri(),
875 secrecy::SecretString::new("test_user".into()),
876 secrecy::SecretString::new("test_password".into()),
877 None,
878 )
879 .await
880 .unwrap();
881
882 let mut config = create_test_config_with_thread_name().await;
883 let network = bitcoin::Network::Bitcoin;
884 let paramset = ProtocolParamset {
885 network,
886 ..ProtocolParamset::default()
887 };
888
889 let mempool_space_uri = mock_mempool_server.uri() + "/";
890
891 config.protocol_paramset = Box::leak(Box::new(paramset));
892 config.mempool_api_host = Some(mempool_space_uri);
893 config.mempool_api_endpoint = Some("api/v1/fees/recommended".into());
894
895 let db = Database::new(&config).await.unwrap();
896 let signer = Actor::new(config.secret_key, config.protocol_paramset.network);
897
898 let tx_sender = TxSender::new(signer, mock_rpc, db, "test_tx_sender".into(), config);
899
900 let fee_rate = tx_sender.get_fee_rate().await.unwrap();
901 assert_eq!(fee_rate, FeeRate::from_sat_per_kwu(1000));
902 }
903
904 #[tokio::test]
905 async fn test_get_fee_rate_rpc_failure_mempool_fallback() {
906 let mock_rpc_server = MockServer::start().await;
907
908 Mock::given(method("POST"))
909 .and(path("/"))
910 .and(body_partial_json(json!({
911 "method": "estimatesmartfee"
912 })))
913 .respond_with(ResponseTemplate::new(500).set_body_json(json!({
914 "jsonrpc": "2.0",
915 "id": 1,
916 "error": {
917 "code": -32603,
918 "message": "Internal error"
919 }
920 })))
921 .mount(&mock_rpc_server)
922 .await;
923
924 Mock::given(method("POST"))
925 .and(path("/"))
926 .and(body_partial_json(json!({
927 "method": "ping"
928 })))
929 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
930 "jsonrpc": "2.0",
931 "id": 1,
932 "result": null
933 })))
934 .mount(&mock_rpc_server)
935 .await;
936
937 let mock_mempool_server = MockServer::start().await;
938 Mock::given(method("GET"))
939 .and(path("/api/v1/fees/recommended"))
940 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
941 "fastestFee": 10,
942 "halfHourFee": 9,
943 "hourFee": 8
944 })))
945 .mount(&mock_mempool_server)
946 .await;
947
948 let mock_rpc = ExtendedBitcoinRpc::connect(
949 mock_rpc_server.uri(),
950 secrecy::SecretString::new("test_user".into()),
951 secrecy::SecretString::new("test_password".into()),
952 None,
953 )
954 .await
955 .unwrap();
956
957 let mut config = create_test_config_with_thread_name().await;
958 let network = bitcoin::Network::Bitcoin;
959 let paramset = ProtocolParamset {
960 network,
961 ..ProtocolParamset::default()
962 };
963
964 let mempool_space_uri = mock_mempool_server.uri() + "/";
965
966 config.protocol_paramset = Box::leak(Box::new(paramset));
967 config.mempool_api_host = Some(mempool_space_uri);
968 config.mempool_api_endpoint = Some("api/v1/fees/recommended".into());
969
970 let db = Database::new(&config).await.unwrap();
971 let signer = Actor::new(config.secret_key, config.protocol_paramset.network);
972
973 let tx_sender = TxSender::new(signer, mock_rpc, db, "test_tx_sender".into(), config);
974
975 let fee_rate = tx_sender.get_fee_rate().await.unwrap();
976 assert_eq!(fee_rate, FeeRate::from_sat_per_kwu(2500));
977 }
978
979 #[tokio::test]
980 async fn test_get_fee_rate_mempool_space_timeout() {
981 let mock_rpc_server = MockServer::start().await;
982
983 Mock::given(method("POST"))
984 .and(path("/"))
985 .and(body_partial_json(json!({
986 "method": "estimatesmartfee"
987 })))
988 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
989 "jsonrpc": "2.0",
990 "id": 1,
991 "result": {
992 "feerate": 0.00008,
993 "blocks": 1
994 }
995 })))
996 .mount(&mock_rpc_server)
997 .await;
998
999 Mock::given(method("POST"))
1000 .and(path("/"))
1001 .and(body_partial_json(json!({
1002 "method": "ping"
1003 })))
1004 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1005 "jsonrpc": "2.0",
1006 "id": 1,
1007 "result": null
1008 })))
1009 .mount(&mock_rpc_server)
1010 .await;
1011
1012 let mock_mempool_server = MockServer::start().await;
1013 Mock::given(method("GET"))
1014 .and(path("/api/v1/fees/recommended"))
1015 .respond_with(
1016 ResponseTemplate::new(200)
1017 .set_delay(Duration::from_secs(10))
1018 .set_body_json(json!({
1019 "fastestFee": 2,
1020 "halfHourFee": 1,
1021 "hourFee": 1
1022 })),
1023 )
1024 .mount(&mock_mempool_server)
1025 .await;
1026
1027 let mock_rpc = ExtendedBitcoinRpc::connect(
1028 mock_rpc_server.uri(),
1029 secrecy::SecretString::new("test_user".into()),
1030 secrecy::SecretString::new("test_password".into()),
1031 None,
1032 )
1033 .await
1034 .unwrap();
1035
1036 let mut config = create_test_config_with_thread_name().await;
1037 let network = bitcoin::Network::Bitcoin;
1038 let paramset = ProtocolParamset {
1039 network,
1040 ..ProtocolParamset::default()
1041 };
1042
1043 let mempool_space_uri = mock_mempool_server.uri() + "/";
1044
1045 config.protocol_paramset = Box::leak(Box::new(paramset));
1046 config.mempool_api_host = Some(mempool_space_uri);
1047 config.mempool_api_endpoint = Some("api/v1/fees/recommended".into());
1048
1049 let db = Database::new(&config).await.unwrap();
1050 let signer = Actor::new(config.secret_key, config.protocol_paramset.network);
1051
1052 let tx_sender = TxSender::new(signer, mock_rpc, db, "test_tx_sender".into(), config);
1053
1054 let fee_rate = tx_sender.get_fee_rate().await.unwrap();
1055 assert_eq!(fee_rate, FeeRate::from_sat_per_kwu(2000));
1056 }
1057
1058 #[tokio::test]
1059 async fn test_get_fee_rate_rpc_timeout() {
1060 let mock_rpc_server = MockServer::start().await;
1061
1062 Mock::given(method("POST"))
1063 .and(path("/"))
1064 .and(body_partial_json(json!({
1065 "method": "estimatesmartfee"
1066 })))
1067 .respond_with(
1068 ResponseTemplate::new(200)
1069 .set_delay(Duration::from_secs(31))
1070 .set_body_json(json!({
1071 "jsonrpc": "2.0",
1072 "id": 1,
1073 "result": {
1074 "feerate": 0.00002,
1075 "blocks": 1
1076 }
1077 })),
1078 )
1079 .mount(&mock_rpc_server)
1080 .await;
1081
1082 Mock::given(method("POST"))
1083 .and(path("/"))
1084 .and(body_partial_json(json!({
1085 "method": "ping"
1086 })))
1087 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1088 "jsonrpc": "2.0",
1089 "id": 1,
1090 "result": null
1091 })))
1092 .mount(&mock_rpc_server)
1093 .await;
1094
1095 let mock_mempool_server = MockServer::start().await;
1096 Mock::given(method("GET"))
1097 .and(path("/api/v1/fees/recommended"))
1098 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1099 "fastestFee": 8,
1100 "halfHourFee": 1,
1101 "hourFee": 1
1102 })))
1103 .mount(&mock_mempool_server)
1104 .await;
1105
1106 let mock_rpc = ExtendedBitcoinRpc::connect(
1107 mock_rpc_server.uri(),
1108 secrecy::SecretString::new("test_user".into()),
1109 secrecy::SecretString::new("test_password".into()),
1110 None,
1111 )
1112 .await
1113 .unwrap();
1114
1115 let mut config = create_test_config_with_thread_name().await;
1116 let network = bitcoin::Network::Bitcoin;
1117 let paramset = ProtocolParamset {
1118 network,
1119 ..ProtocolParamset::default()
1120 };
1121
1122 let mempool_space_uri = mock_mempool_server.uri() + "/";
1123
1124 config.protocol_paramset = Box::leak(Box::new(paramset));
1125 config.mempool_api_host = Some(mempool_space_uri);
1126 config.mempool_api_endpoint = Some("api/v1/fees/recommended".into());
1127
1128 let db = Database::new(&config).await.unwrap();
1129 let signer = Actor::new(config.secret_key, config.protocol_paramset.network);
1130
1131 let tx_sender = TxSender::new(signer, mock_rpc, db, "test_tx_sender".into(), config);
1132
1133 let fee_rate = tx_sender.get_fee_rate().await.unwrap();
1134 assert_eq!(fee_rate, FeeRate::from_sat_per_kwu(2000));
1135 }
1136
1137 #[tokio::test]
1138 async fn test_rpc_retry_after_failures() {
1139 struct RpcSeqResponder {
1140 n: Arc<AtomicUsize>,
1141 }
1142 impl Respond for RpcSeqResponder {
1143 fn respond(&self, _req: &Request) -> ResponseTemplate {
1144 let i = self.n.fetch_add(1, Ordering::SeqCst);
1145 match i {
1146 0 => ResponseTemplate::new(500).set_body_json(json!({
1147 "jsonrpc":"2.0","id":1,"error":{"code":-1,"message":"Connection error 1"}
1148 })),
1149 1 => ResponseTemplate::new(500).set_body_json(json!({
1150 "jsonrpc":"2.0","id":1,"error":{"code":-1,"message":"Connection error 2"}
1151 })),
1152 _ => ResponseTemplate::new(200).set_body_json(json!({
1153 "jsonrpc":"2.0","id":1,"result":{"feerate":0.00003,"blocks":1}
1154 })),
1155 }
1156 }
1157 }
1158
1159 let mock_rpc_server = MockServer::start().await;
1160 let counter = Arc::new(AtomicUsize::new(0));
1161
1162 Mock::given(method("POST"))
1163 .and(path("/"))
1164 .and(body_partial_json(json!({
1165 "method": "estimatesmartfee"
1166 })))
1167 .respond_with(RpcSeqResponder { n: counter.clone() })
1168 .mount(&mock_rpc_server)
1169 .await;
1170
1171 Mock::given(method("POST"))
1172 .and(path("/"))
1173 .and(body_partial_json(json!({"method": "ping"})))
1174 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1175 "jsonrpc": "2.0",
1176 "id": 1,
1177 "result": null
1178 })))
1179 .mount(&mock_rpc_server)
1180 .await;
1181
1182 let mock_mempool_server = MockServer::start().await;
1183 Mock::given(method("GET"))
1184 .and(path("/api/v1/fees/recommended"))
1185 .respond_with(ResponseTemplate::new(500))
1186 .mount(&mock_mempool_server)
1187 .await;
1188
1189 let mock_rpc = ExtendedBitcoinRpc::connect(
1190 mock_rpc_server.uri(),
1191 secrecy::SecretString::new("test_user".into()),
1192 secrecy::SecretString::new("test_password".into()),
1193 None,
1194 )
1195 .await
1196 .unwrap();
1197
1198 let mut config = create_test_config_with_thread_name().await;
1199 let network = bitcoin::Network::Bitcoin;
1200 let paramset = ProtocolParamset {
1201 network,
1202 ..ProtocolParamset::default()
1203 };
1204
1205 let mempool_space_uri = mock_mempool_server.uri() + "/";
1206 config.protocol_paramset = Box::leak(Box::new(paramset));
1207 config.mempool_api_host = Some(mempool_space_uri);
1208 config.mempool_api_endpoint = Some("api/v1/fees/recommended".into());
1209
1210 let db = Database::new(&config).await.unwrap();
1211 let signer = Actor::new(config.secret_key, config.protocol_paramset.network);
1212
1213 let tx_sender = TxSender::new(signer, mock_rpc, db, "test_tx_sender".into(), config);
1214
1215 let fee_rate = tx_sender.get_fee_rate().await.unwrap();
1216 assert_eq!(fee_rate, FeeRate::from_sat_per_kwu(750));
1217 }
1218
1219 #[tokio::test]
1220 async fn test_mempool_retry_after_failures() {
1221 let mock_rpc_server = MockServer::start().await;
1222
1223 Mock::given(method("POST"))
1224 .and(path("/"))
1225 .and(body_partial_json(json!({"method": "estimatesmartfee"})))
1226 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1227 "jsonrpc": "2.0",
1228 "id": 1,
1229 "result": {
1230 "feerate": 0.00009,
1231 "blocks": 1
1232 }
1233 })))
1234 .expect(1)
1235 .mount(&mock_rpc_server)
1236 .await;
1237
1238 Mock::given(method("POST"))
1239 .and(path("/"))
1240 .and(body_partial_json(json!({"method": "ping"})))
1241 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1242 "jsonrpc": "2.0",
1243 "id": 1,
1244 "result": null
1245 })))
1246 .mount(&mock_rpc_server)
1247 .await;
1248
1249 struct SeqResponder {
1250 n: Arc<AtomicUsize>,
1251 }
1252
1253 impl Respond for SeqResponder {
1254 fn respond(&self, _req: &Request) -> ResponseTemplate {
1255 let i = self.n.fetch_add(1, Ordering::SeqCst);
1256 match i {
1257 0 => ResponseTemplate::new(500),
1258 1 => ResponseTemplate::new(503),
1259 2 => ResponseTemplate::new(500),
1260 _ => ResponseTemplate::new(200).set_body_json(serde_json::json!({
1261 "fastestFee": 6,
1262 "halfHourFee": 4,
1263 "hourFee": 3
1264 })),
1265 }
1266 }
1267 }
1268
1269 let mock_mempool_server = MockServer::start().await;
1270
1271 let counter = Arc::new(AtomicUsize::new(0));
1272 Mock::given(method("GET"))
1273 .and(path("/api/v1/fees/recommended"))
1274 .respond_with(SeqResponder { n: counter.clone() })
1275 .mount(&mock_mempool_server)
1276 .await;
1277
1278 let mock_rpc = ExtendedBitcoinRpc::connect(
1279 mock_rpc_server.uri(),
1280 secrecy::SecretString::new("test_user".into()),
1281 secrecy::SecretString::new("test_password".into()),
1282 None,
1283 )
1284 .await
1285 .unwrap();
1286
1287 let mut config = create_test_config_with_thread_name().await;
1288 let network = bitcoin::Network::Bitcoin;
1289 let paramset = ProtocolParamset {
1290 network,
1291 ..ProtocolParamset::default()
1292 };
1293
1294 let mempool_space_uri = mock_mempool_server.uri() + "/";
1295 config.protocol_paramset = Box::leak(Box::new(paramset));
1296 config.mempool_api_host = Some(mempool_space_uri);
1297 config.mempool_api_endpoint = Some("api/v1/fees/recommended".into());
1298
1299 let db = Database::new(&config).await.unwrap();
1300 let signer = Actor::new(config.secret_key, config.protocol_paramset.network);
1301
1302 let tx_sender = TxSender::new(signer, mock_rpc, db, "test_tx_sender".into(), config);
1303
1304 let fee_rate = tx_sender.get_fee_rate().await.unwrap();
1305 assert_eq!(fee_rate, FeeRate::from_sat_per_kwu(1500));
1306 }
1307
1308 #[tokio::test]
1309 async fn test_hard_cap() {
1310 let mock_rpc_server = MockServer::start().await;
1311 Mock::given(method("POST"))
1312 .and(path("/"))
1313 .and(body_partial_json(json!({
1314 "method": "estimatesmartfee"
1315 })))
1316 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1317 "jsonrpc": "2.0",
1318 "id": 1,
1319 "result": {
1320 "feerate": 0.00500,
1321 "blocks": 1
1322 }
1323 })))
1324 .mount(&mock_rpc_server)
1325 .await;
1326
1327 Mock::given(method("POST"))
1328 .and(path("/"))
1329 .and(body_partial_json(json!({
1330 "method": "ping"
1331 })))
1332 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1333 "jsonrpc": "2.0",
1334 "id": 1,
1335 "result": null
1336 })))
1337 .mount(&mock_rpc_server)
1338 .await;
1339
1340 let mock_mempool_server = MockServer::start().await;
1341 Mock::given(method("GET"))
1342 .and(path("/api/v1/fees/recommended"))
1343 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1344 "fastestFee": 500,
1345 "halfHourFee": 499,
1346 "hourFee": 498
1347 })))
1348 .mount(&mock_mempool_server)
1349 .await;
1350
1351 let mock_rpc = ExtendedBitcoinRpc::connect(
1352 mock_rpc_server.uri(),
1353 secrecy::SecretString::new("test_user".into()),
1354 secrecy::SecretString::new("test_password".into()),
1355 None,
1356 )
1357 .await
1358 .unwrap();
1359
1360 let mut config = create_test_config_with_thread_name().await;
1361 let network = bitcoin::Network::Bitcoin;
1362 let paramset = ProtocolParamset {
1363 network,
1364 ..ProtocolParamset::default()
1365 };
1366
1367 let mempool_space_uri = mock_mempool_server.uri() + "/";
1368
1369 config.protocol_paramset = Box::leak(Box::new(paramset));
1370 config.mempool_api_host = Some(mempool_space_uri);
1371 config.mempool_api_endpoint = Some("api/v1/fees/recommended".into());
1372
1373 let db = Database::new(&config).await.unwrap();
1374 let signer = Actor::new(config.secret_key, config.protocol_paramset.network);
1375
1376 let tx_sender = TxSender::new(
1377 signer,
1378 mock_rpc,
1379 db,
1380 "test_tx_sender".into(),
1381 config.clone(),
1382 );
1383
1384 let fee_rate = tx_sender.get_fee_rate().await.unwrap();
1385 assert_eq!(
1386 fee_rate,
1387 FeeRate::from_sat_per_kwu(
1388 config
1389 .tx_sender_limits
1390 .fee_rate_hard_cap
1391 .mul(1000)
1392 .div_ceil(4)
1393 )
1394 );
1395 }
1396}