clementine_tx_sender/jsonrpc/
server.rs

1use std::net::SocketAddr;
2
3use bitcoin::consensus;
4use bitcoin::Transaction;
5use jsonrpsee::server::{ServerBuilder, ServerHandle};
6use jsonrpsee::types::ErrorObjectOwned;
7use jsonrpsee::{server::Server, RpcModule};
8
9use crate::client::TxSenderClient;
10use clementine_errors::BridgeError;
11use tx_sender_types::clementine::InsertTryToSendParams;
12
13#[cfg(feature = "citrea")]
14use tx_sender_types::citrea::InsertCitreaRawTxParams;
15
16const JSONRPC_INTERNAL_ERROR_CODE: i32 = -32_000;
17const MAX_JSONRPC_REQUEST_BODY_SIZE: u32 = 50 * 1024 * 1024;
18
19fn jsonrpc_err(message: impl ToString) -> ErrorObjectOwned {
20    ErrorObjectOwned::owned(JSONRPC_INTERNAL_ERROR_CODE, message.to_string(), None::<()>)
21}
22
23#[derive(Debug, Clone)]
24pub struct TxSenderJsonRpcServer {
25    handle: ServerHandle,
26    local_addr: SocketAddr,
27}
28
29impl TxSenderJsonRpcServer {
30    pub fn local_addr(&self) -> SocketAddr {
31        self.local_addr
32    }
33
34    pub fn stop(self) -> ServerHandle {
35        self.handle
36    }
37}
38
39/// Starts a JSON-RPC server exposing `send_tx` and `send_citrea_tx` methods.
40/// `send_tx` and `send_citrea_tx` are transactional: it begins a DB transaction, calls
41/// `TxSenderClient::insert_try_to_send` or `TxSenderClient::send_citrea_tx`, and commits on success.
42pub async fn start_jsonrpc_server(
43    tx_sender_client: TxSenderClient,
44    bind_addr: SocketAddr,
45) -> Result<TxSenderJsonRpcServer, BridgeError> {
46    let server: Server = ServerBuilder::default()
47        .max_request_body_size(MAX_JSONRPC_REQUEST_BODY_SIZE)
48        .build(bind_addr)
49        .await
50        .map_err(|e| BridgeError::Eyre(e.into()))?;
51
52    let local_addr = server
53        .local_addr()
54        .map_err(|e| BridgeError::Eyre(e.into()))?;
55
56    let mut module = RpcModule::new(tx_sender_client.clone());
57    module
58        .register_async_method("send_tx", |params, client, _| async move {
59            let req: InsertTryToSendParams = params.one().map_err(jsonrpc_err)?;
60
61            let raw_tx = hex::decode(&req.signed_tx_hex).map_err(jsonrpc_err)?;
62            let signed_tx: Transaction = consensus::deserialize(&raw_tx).map_err(jsonrpc_err)?;
63
64            let mut dbtx = client.db.begin_transaction().await.map_err(jsonrpc_err)?;
65
66            let try_to_send_id = client
67                .insert_try_to_send(
68                    &mut dbtx,
69                    req.tx_metadata,
70                    &signed_tx,
71                    req.fee_paying_type,
72                    req.rbf_signing_info,
73                    &req.cancel_outpoints,
74                    &req.cancel_txids,
75                    &req.activate_txids,
76                    &req.activate_outpoints,
77                )
78                .await
79                .map_err(jsonrpc_err)?;
80
81            client
82                .db
83                .commit_transaction(dbtx)
84                .await
85                .map_err(jsonrpc_err)?;
86
87            Ok::<u32, ErrorObjectOwned>(try_to_send_id)
88        })
89        .map_err(|e| BridgeError::Eyre(e.into()))?;
90
91    // Citrea-specific RPCs.
92    #[cfg(feature = "citrea")]
93    {
94        module
95            .register_async_method("send_citrea_tx", |params, client, _| async move {
96                let req: InsertCitreaRawTxParams = params.one().map_err(jsonrpc_err)?;
97
98                let insertion_id = client
99                    .send_citrea_tx(req.citrea_tx_request)
100                    .await
101                    .map_err(jsonrpc_err)?;
102
103                Ok::<i64, ErrorObjectOwned>(insertion_id)
104            })
105            .map_err(|e| BridgeError::Eyre(e.into()))?;
106    }
107
108    let handle = server.start(module);
109
110    Ok(TxSenderJsonRpcServer { handle, local_addr })
111}
112
113#[cfg(test)]
114mod tests {
115    use crate::test_utils::create_test_environment;
116    use crate::TxSenderDb;
117    use clementine_utils::FeePayingType;
118
119    use super::*;
120
121    #[tokio::test]
122    async fn test_jsonrpc_txsender_insert_try_to_send() -> Result<(), BridgeError> {
123        use std::time::{Duration, Instant};
124
125        use crate::jsonrpc::client::JsonRpcTxSenderClient;
126        use crate::task::spawn_txsender_loop_with_free_localhost_jsonrpc_port;
127        use bitcoin::absolute;
128        use bitcoin::hashes::Hash as _;
129        use bitcoin::transaction::Version;
130        use bitcoin::{OutPoint, ScriptBuf, Sequence, Transaction, TxIn, TxOut, Txid, Witness};
131
132        let (config, db, rpc) = create_test_environment(true, true).await;
133        let rpc = rpc.unwrap();
134        let db = db.unwrap();
135        rpc.rpc().mine_blocks(1).await.unwrap();
136
137        // Start txsender with JSON-RPC enabled on a free port.
138        let tx_sender_cfg = config.clone();
139        let (addr, handle) = spawn_txsender_loop_with_free_localhost_jsonrpc_port(tx_sender_cfg);
140        let url = format!("http://{addr}");
141        let client =
142            JsonRpcTxSenderClient::new(&url).map_err(|e| BridgeError::Eyre(eyre::eyre!(e)))?;
143
144        // A minimal syntactically-valid transaction (doesn't need to be mineable for enqueueing).
145        let tx = Transaction {
146            version: Version::TWO,
147            lock_time: absolute::LockTime::ZERO,
148            input: vec![TxIn {
149                previous_output: OutPoint {
150                    txid: Txid::all_zeros(),
151                    vout: 0,
152                },
153                script_sig: ScriptBuf::new(),
154                sequence: Sequence::ENABLE_LOCKTIME_NO_RBF,
155                witness: Witness::default(),
156            }],
157            output: vec![TxOut {
158                value: bitcoin::Amount::from_sat(0),
159                script_pubkey: ScriptBuf::new(),
160            }],
161        };
162
163        // Wait for server to come up (spawn loop initializes asynchronously).
164        let start = Instant::now();
165        let try_to_send_id = loop {
166            match client
167                .insert_try_to_send(None, &tx, FeePayingType::CPFP, None, &[], &[], &[], &[])
168                .await
169            {
170                Ok(id) => break id,
171                Err(e) => {
172                    if start.elapsed() > Duration::from_secs(10) {
173                        return Err(BridgeError::Eyre(eyre::eyre!(
174                            "Timed out waiting for txsender JSON-RPC to start: {e:?}"
175                        )));
176                    }
177                    tokio::time::sleep(Duration::from_millis(100)).await;
178                }
179            }
180        };
181
182        // Verify persisted in DB.
183        let tx_sender_db = TxSenderDb::from_pool(db.pool().clone());
184        let (_meta, stored_tx, fee_paying_type, _seen_at_height, _rbf) = tx_sender_db
185            .get_try_to_send_tx(None, try_to_send_id)
186            .await?;
187        assert_eq!(fee_paying_type, FeePayingType::CPFP);
188        assert_eq!(stored_tx.compute_txid(), tx.compute_txid());
189
190        // Stop background loop.
191        handle.abort();
192        let _ = handle.await;
193
194        Ok(())
195    }
196}