clementine_tx_sender/jsonrpc/
server.rs1use std::net::SocketAddr;
2
3use bitcoin::consensus;
4use bitcoin::Transaction;
5use jsonrpsee::server::{ServerBuilder, ServerHandle};
6use jsonrpsee::types::ErrorObjectOwned;
7use jsonrpsee::{server::Server, RpcModule};
8
9use crate::client::TxSenderClient;
10use clementine_errors::BridgeError;
11use tx_sender_types::clementine::InsertTryToSendParams;
12
13#[cfg(feature = "citrea")]
14use tx_sender_types::citrea::InsertCitreaRawTxParams;
15
16const JSONRPC_INTERNAL_ERROR_CODE: i32 = -32_000;
17const MAX_JSONRPC_REQUEST_BODY_SIZE: u32 = 50 * 1024 * 1024;
18
19fn jsonrpc_err(message: impl ToString) -> ErrorObjectOwned {
20 ErrorObjectOwned::owned(JSONRPC_INTERNAL_ERROR_CODE, message.to_string(), None::<()>)
21}
22
23#[derive(Debug, Clone)]
24pub struct TxSenderJsonRpcServer {
25 handle: ServerHandle,
26 local_addr: SocketAddr,
27}
28
29impl TxSenderJsonRpcServer {
30 pub fn local_addr(&self) -> SocketAddr {
31 self.local_addr
32 }
33
34 pub fn stop(self) -> ServerHandle {
35 self.handle
36 }
37}
38
39pub async fn start_jsonrpc_server(
43 tx_sender_client: TxSenderClient,
44 bind_addr: SocketAddr,
45) -> Result<TxSenderJsonRpcServer, BridgeError> {
46 let server: Server = ServerBuilder::default()
47 .max_request_body_size(MAX_JSONRPC_REQUEST_BODY_SIZE)
48 .build(bind_addr)
49 .await
50 .map_err(|e| BridgeError::Eyre(e.into()))?;
51
52 let local_addr = server
53 .local_addr()
54 .map_err(|e| BridgeError::Eyre(e.into()))?;
55
56 let mut module = RpcModule::new(tx_sender_client.clone());
57 module
58 .register_async_method("send_tx", |params, client, _| async move {
59 let req: InsertTryToSendParams = params.one().map_err(jsonrpc_err)?;
60
61 let raw_tx = hex::decode(&req.signed_tx_hex).map_err(jsonrpc_err)?;
62 let signed_tx: Transaction = consensus::deserialize(&raw_tx).map_err(jsonrpc_err)?;
63
64 let mut dbtx = client.db.begin_transaction().await.map_err(jsonrpc_err)?;
65
66 let try_to_send_id = client
67 .insert_try_to_send(
68 &mut dbtx,
69 req.tx_metadata,
70 &signed_tx,
71 req.fee_paying_type,
72 req.rbf_signing_info,
73 &req.cancel_outpoints,
74 &req.cancel_txids,
75 &req.activate_txids,
76 &req.activate_outpoints,
77 )
78 .await
79 .map_err(jsonrpc_err)?;
80
81 client
82 .db
83 .commit_transaction(dbtx)
84 .await
85 .map_err(jsonrpc_err)?;
86
87 Ok::<u32, ErrorObjectOwned>(try_to_send_id)
88 })
89 .map_err(|e| BridgeError::Eyre(e.into()))?;
90
91 #[cfg(feature = "citrea")]
93 {
94 module
95 .register_async_method("send_citrea_tx", |params, client, _| async move {
96 let req: InsertCitreaRawTxParams = params.one().map_err(jsonrpc_err)?;
97
98 let insertion_id = client
99 .send_citrea_tx(req.citrea_tx_request)
100 .await
101 .map_err(jsonrpc_err)?;
102
103 Ok::<i64, ErrorObjectOwned>(insertion_id)
104 })
105 .map_err(|e| BridgeError::Eyre(e.into()))?;
106 }
107
108 let handle = server.start(module);
109
110 Ok(TxSenderJsonRpcServer { handle, local_addr })
111}
112
113#[cfg(test)]
114mod tests {
115 use crate::test_utils::create_test_environment;
116 use crate::TxSenderDb;
117 use clementine_utils::FeePayingType;
118
119 use super::*;
120
121 #[tokio::test]
122 async fn test_jsonrpc_txsender_insert_try_to_send() -> Result<(), BridgeError> {
123 use std::time::{Duration, Instant};
124
125 use crate::jsonrpc::client::JsonRpcTxSenderClient;
126 use crate::task::spawn_txsender_loop_with_free_localhost_jsonrpc_port;
127 use bitcoin::absolute;
128 use bitcoin::hashes::Hash as _;
129 use bitcoin::transaction::Version;
130 use bitcoin::{OutPoint, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Txid, Witness};
131
132 let (config, db, rpc) = create_test_environment(true, true).await;
133 let rpc = rpc.unwrap();
134 let db = db.unwrap();
135 rpc.rpc().mine_blocks(1).await.unwrap();
136
137 let tx_sender_cfg = config.clone();
139 let (addr, handle) = spawn_txsender_loop_with_free_localhost_jsonrpc_port(tx_sender_cfg);
140 let url = format!("http://{addr}");
141 let client =
142 JsonRpcTxSenderClient::new(&url).map_err(|e| BridgeError::Eyre(eyre::eyre!(e)))?;
143
144 let tx = Transaction {
146 version: Version::TWO,
147 lock_time: absolute::LockTime::ZERO,
148 input: vec![TxIn {
149 previous_output: OutPoint {
150 txid: Txid::all_zeros(),
151 vout: 0,
152 },
153 script_sig: ScriptBuf::new(),
154 sequence: Sequence::ENABLE_LOCKTIME_NO_RBF,
155 witness: Witness::default(),
156 }],
157 output: vec![TxOut {
158 value: bitcoin::Amount::from_sat(0),
159 script_pubkey: ScriptBuf::new(),
160 }],
161 };
162
163 let start = Instant::now();
165 let try_to_send_id = loop {
166 match client
167 .insert_try_to_send(None, &tx, FeePayingType::CPFP, None, &[], &[], &[], &[])
168 .await
169 {
170 Ok(id) => break id,
171 Err(e) => {
172 if start.elapsed() > Duration::from_secs(10) {
173 return Err(BridgeError::Eyre(eyre::eyre!(
174 "Timed out waiting for txsender JSON-RPC to start: {e:?}"
175 )));
176 }
177 tokio::time::sleep(Duration::from_millis(100)).await;
178 }
179 }
180 };
181
182 let tx_sender_db = TxSenderDb::from_pool(db.pool().clone());
184 let (_meta, stored_tx, fee_paying_type, _seen_at_height, _rbf) = tx_sender_db
185 .get_try_to_send_tx(None, try_to_send_id)
186 .await?;
187 assert_eq!(fee_paying_type, FeePayingType::CPFP);
188 assert_eq!(stored_tx.compute_txid(), tx.compute_txid());
189
190 handle.abort();
192 let _ = handle.await;
193
194 Ok(())
195 }
196}