From 064684013b57318040cbe632077c9aede0c99859 Mon Sep 17 00:00:00 2001 From: Camillarhi Date: Fri, 5 Sep 2025 13:39:26 +0100 Subject: [PATCH 1/2] Implement background job for transaction rebroadcasting Introduces a `RebroadcastPolicy` to manage the automatic rebroadcasting of unconfirmed transactions with exponential backoff. This prevents excessive network spam while systematically retrying stuck transactions. The feature is enabled by default but can be disabled via the builder: `builder.set_auto_rebroadcast_unconfirmed(false)`. Configuration options: - min_rebroadcast_interval: Base delay between attempts (seconds) - max_broadcast_attempts: Total attempts before abandonment - backoff_factor: Multiplier for exponential delay increase Sensible defaults are provided (300s, 24 attempts, 1.5x backoff). --- src/wallet/mod.rs | 62 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 05c743bd9..fda4a8e10 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -29,6 +29,7 @@ use bitcoin::{ Address, Amount, FeeRate, OutPoint, ScriptBuf, Transaction, TxOut, Txid, WPubkeyHash, Weight, WitnessProgram, WitnessVersion, }; + use lightning::chain::chaininterface::BroadcasterInterface; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::{BestBlock, Listen}; @@ -244,31 +245,54 @@ impl Wallet { self.pending_payment_store.insert_or_update(pending_payment)?; }, WalletEvent::ChainTipChanged { new_tip, .. } => { - // Get all payments that are Pending with Confirmed status + // Get all on-chain payments that are Pending let pending_payments: Vec = self.pending_payment_store.list_filter(|p| { p.details.status == PaymentStatus::Pending - && matches!( - p.details.kind, - PaymentKind::Onchain { - status: ConfirmationStatus::Confirmed { .. }, - .. - } - ) + && matches!(p.details.kind, PaymentKind::Onchain { .. }) }); + let mut unconfirmed_outbound_txids: Vec = Vec::new(); + for mut payment in pending_payments { - if let PaymentKind::Onchain { - status: ConfirmationStatus::Confirmed { height, .. }, - .. - } = payment.details.kind - { - let payment_id = payment.details.id; - if new_tip.height >= height + ANTI_REORG_DELAY - 1 { - payment.details.status = PaymentStatus::Succeeded; - self.payment_store.insert_or_update(payment.details)?; - self.pending_payment_store.remove(&payment_id)?; - } + match payment.details.kind { + PaymentKind::Onchain { + status: ConfirmationStatus::Confirmed { height, .. }, + .. + } => { + let payment_id = payment.details.id; + if new_tip.height >= height + ANTI_REORG_DELAY - 1 { + payment.details.status = PaymentStatus::Succeeded; + self.payment_store.insert_or_update(payment.details)?; + self.pending_payment_store.remove(&payment_id)?; + } + }, + PaymentKind::Onchain { + txid, + status: ConfirmationStatus::Unconfirmed, + } if payment.details.direction == PaymentDirection::Outbound => { + unconfirmed_outbound_txids.push(txid); + }, + _ => {}, + } + } + + if !unconfirmed_outbound_txids.is_empty() { + let txs_to_broadcast: Vec = unconfirmed_outbound_txids + .iter() + .filter_map(|txid| { + locked_wallet.tx_details(*txid).map(|d| (*d.tx).clone()) + }) + .collect(); + + if !txs_to_broadcast.is_empty() { + let tx_refs: Vec<&Transaction> = txs_to_broadcast.iter().collect(); + self.broadcaster.broadcast_transactions(&tx_refs); + log_info!( + self.logger, + "Rebroadcast {} unconfirmed transactions on chain tip change", + txs_to_broadcast.len() + ); } } }, From 8631a4ff78372bbb4a01a78a14675393ebd92619 Mon Sep 17 00:00:00 2001 From: Camillarhi Date: Fri, 30 Jan 2026 10:10:09 +0100 Subject: [PATCH 2/2] Implement RBF fee bumping for unconfirmed transactions Add `Replace-by-Fee` functionality to allow users to increase fees on pending outbound transactions, improving confirmation likelihood during network congestion. - Uses BDK's `build_fee_bump` for transaction replacement - Validates transaction eligibility: must be outbound and unconfirmed - Implements fee rate estimation with safety limits - Maintains payment history consistency across wallet updates - Includes integration tests for various RBF scenarios --- bindings/ldk_node.udl | 2 + src/payment/onchain.rs | 15 +++ src/payment/pending_payment_store.rs | 11 ++- src/payment/store.rs | 18 +++- src/wallet/mod.rs | 139 ++++++++++++++++++++++++++- tests/integration_tests_rust.rs | 110 ++++++++++++++++++++- 6 files changed, 285 insertions(+), 10 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index c881dbe09..40d2e5f24 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -266,6 +266,8 @@ interface OnchainPayment { Txid send_to_address([ByRef]Address address, u64 amount_sats, FeeRate? fee_rate); [Throws=NodeError] Txid send_all_to_address([ByRef]Address address, boolean retain_reserve, FeeRate? fee_rate); + [Throws=NodeError] + Txid bump_fee_rbf(PaymentId payment_id); }; interface FeeRate { diff --git a/src/payment/onchain.rs b/src/payment/onchain.rs index 695f96d43..4310daae2 100644 --- a/src/payment/onchain.rs +++ b/src/payment/onchain.rs @@ -10,6 +10,7 @@ use std::sync::{Arc, RwLock}; use bitcoin::{Address, Txid}; +use lightning::ln::channelmanager::PaymentId; use crate::config::Config; use crate::error::Error; @@ -120,4 +121,18 @@ impl OnchainPayment { let fee_rate_opt = maybe_map_fee_rate_opt!(fee_rate); self.wallet.send_to_address(address, send_amount, fee_rate_opt) } + + /// Attempt to bump the fee of an unconfirmed transaction using Replace-by-Fee (RBF). + /// + /// This creates a new transaction that replaces the original one, increasing the fee by the + /// specified increment to improve its chances of confirmation. The original transaction must + /// be signaling RBF replaceability for this to succeed. + /// + /// The new transaction will have the same outputs as the original but with a + /// higher fee, resulting in faster confirmation potential. + /// + /// Returns the Txid of the new replacement transaction if successful. + pub fn bump_fee_rbf(&self, payment_id: PaymentId) -> Result { + self.wallet.bump_fee_rbf(payment_id) + } } diff --git a/src/payment/pending_payment_store.rs b/src/payment/pending_payment_store.rs index 580bdcbcc..0327e4bc9 100644 --- a/src/payment/pending_payment_store.rs +++ b/src/payment/pending_payment_store.rs @@ -84,10 +84,11 @@ impl StorableObjectUpdate for PendingPaymentDetailsUpdate impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate { fn from(value: &PendingPaymentDetails) -> Self { - Self { - id: value.id(), - payment_update: Some(value.details.to_update()), - conflicting_txids: Some(value.conflicting_txids.clone()), - } + let conflicting_txids = if value.conflicting_txids.is_empty() { + None + } else { + Some(value.conflicting_txids.clone()) + }; + Self { id: value.id(), payment_update: Some(value.details.to_update()), conflicting_txids } } } diff --git a/src/payment/store.rs b/src/payment/store.rs index 15e94190c..f16b1f32e 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -291,6 +291,15 @@ impl StorableObject for PaymentDetails { } } + if let Some(tx_id) = update.txid { + match self.kind { + PaymentKind::Onchain { ref mut txid, .. } => { + update_if_necessary!(*txid, tx_id); + }, + _ => {}, + } + } + if updated { self.latest_update_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -540,6 +549,7 @@ pub(crate) struct PaymentDetailsUpdate { pub direction: Option, pub status: Option, pub confirmation_status: Option, + pub txid: Option, } impl PaymentDetailsUpdate { @@ -555,6 +565,7 @@ impl PaymentDetailsUpdate { direction: None, status: None, confirmation_status: None, + txid: None, } } } @@ -570,9 +581,9 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate { _ => (None, None, None), }; - let confirmation_status = match value.kind { - PaymentKind::Onchain { status, .. } => Some(status), - _ => None, + let (confirmation_status, txid) = match &value.kind { + PaymentKind::Onchain { status, txid, .. } => (Some(*status), Some(*txid)), + _ => (None, None), }; let counterparty_skimmed_fee_msat = match value.kind { @@ -593,6 +604,7 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate { direction: Some(value.direction), status: Some(value.status), confirmation_status, + txid, } } } diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index fda4a8e10..3be45c386 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -12,6 +12,7 @@ use std::sync::{Arc, Mutex}; use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; use bdk_wallet::descriptor::ExtendedDescriptor; +use bdk_wallet::error::{BuildFeeBumpError, CreateTxError}; use bdk_wallet::event::WalletEvent; #[allow(deprecated)] use bdk_wallet::SignOptions; @@ -323,9 +324,11 @@ impl Wallet { let conflict_txids: Vec = conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect(); + // Use the last transaction id in the conflicts as the new txid + let new_txid = conflicts.last().map(|(_, new_tx)| *new_tx).unwrap_or(txid); let payment = self.create_payment_from_tx( locked_wallet, - txid, + new_txid, payment_id, &tx, PaymentStatus::Pending, @@ -1002,6 +1005,140 @@ impl Wallet { None } + + #[allow(deprecated)] + pub(crate) fn bump_fee_rbf(&self, payment_id: PaymentId) -> Result { + let payment = self.payment_store.get(&payment_id).ok_or(Error::InvalidPaymentId)?; + + let mut locked_wallet = self.inner.lock().unwrap(); + + if payment.direction != PaymentDirection::Outbound { + log_error!(self.logger, "Transaction {} is not an outbound payment", payment_id); + return Err(Error::InvalidPaymentId); + } + + if let PaymentKind::Onchain { status, .. } = &payment.kind { + match status { + ConfirmationStatus::Confirmed { .. } => { + log_error!( + self.logger, + "Transaction {} is already confirmed and cannot be fee bumped", + payment_id + ); + return Err(Error::InvalidPaymentId); + }, + ConfirmationStatus::Unconfirmed => {}, + } + } + + let txid = match &payment.kind { + PaymentKind::Onchain { txid, .. } => *txid, + _ => return Err(Error::InvalidPaymentId), + }; + + let confirmation_target = ConfirmationTarget::OnchainPayment; + let estimated_fee_rate = self.fee_estimator.estimate_fee_rate(confirmation_target); + + log_info!(self.logger, "Bumping fee to {}", estimated_fee_rate); + + let mut psbt = { + let mut builder = locked_wallet.build_fee_bump(txid).map_err(|e| { + log_error!(self.logger, "BDK fee bump failed for {}: {:?}", txid, e); + match e { + BuildFeeBumpError::TransactionNotFound(_) => Error::InvalidPaymentId, + BuildFeeBumpError::TransactionConfirmed(_) => Error::InvalidPaymentId, + BuildFeeBumpError::IrreplaceableTransaction(_) => Error::InvalidPaymentId, + BuildFeeBumpError::FeeRateUnavailable => Error::InvalidPaymentId, + _ => Error::InvalidFeeRate, + } + })?; + + builder.fee_rate(estimated_fee_rate); + + match builder.finish() { + Ok(psbt) => Ok(psbt), + Err(CreateTxError::FeeRateTooLow { required }) => { + log_info!(self.logger, "BDK requires higher fee rate: {}", required); + + // Safety check + const MAX_REASONABLE_FEE_RATE_SAT_VB: u64 = 1000; + if required.to_sat_per_vb_ceil() > MAX_REASONABLE_FEE_RATE_SAT_VB { + log_error!( + self.logger, + "BDK requires unreasonably high fee rate: {} sat/vB", + required.to_sat_per_vb_ceil() + ); + return Err(Error::InvalidFeeRate); + } + + let mut builder = locked_wallet.build_fee_bump(txid).map_err(|e| { + log_error!(self.logger, "BDK fee bump retry failed for {}: {:?}", txid, e); + Error::InvalidFeeRate + })?; + + builder.fee_rate(required); + builder.finish().map_err(|e| { + log_error!( + self.logger, + "Failed to finish PSBT with required fee rate: {:?}", + e + ); + Error::InvalidFeeRate + }) + }, + Err(e) => { + log_error!(self.logger, "Failed to create fee bump PSBT: {:?}", e); + Err(Error::InvalidFeeRate) + }, + }? + }; + + match locked_wallet.sign(&mut psbt, SignOptions::default()) { + Ok(finalized) => { + if !finalized { + return Err(Error::OnchainTxCreationFailed); + } + }, + Err(err) => { + log_error!(self.logger, "Failed to create transaction: {}", err); + return Err(err.into()); + }, + } + + let mut locked_persister = self.persister.lock().unwrap(); + locked_wallet.persist(&mut locked_persister).map_err(|e| { + log_error!(self.logger, "Failed to persist wallet: {}", e); + Error::PersistenceFailed + })?; + + let fee_bumped_tx = psbt.extract_tx().map_err(|e| { + log_error!(self.logger, "Failed to extract transaction: {}", e); + e + })?; + + let new_txid = fee_bumped_tx.compute_txid(); + + self.broadcaster.broadcast_transactions(&[&fee_bumped_tx]); + + let new_payment = self.create_payment_from_tx( + &locked_wallet, + new_txid, + payment.id, + &fee_bumped_tx, + PaymentStatus::Pending, + ConfirmationStatus::Unconfirmed, + ); + + let pending_payment_store = + self.create_pending_payment_from_tx(new_payment.clone(), Vec::new()); + + self.pending_payment_store.insert_or_update(pending_payment_store)?; + self.payment_store.insert_or_update(new_payment)?; + + log_info!(self.logger, "RBF successful: replaced {} with {}", txid, new_txid); + + Ok(new_txid) + } } impl Listen for Wallet { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 4e94dd044..dbef18aa7 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use bitcoin::address::NetworkUnchecked; use bitcoin::hashes::sha256::Hash as Sha256Hash; use bitcoin::hashes::Hash; -use bitcoin::{Address, Amount, ScriptBuf}; +use bitcoin::{Address, Amount, ScriptBuf, Txid}; use common::logging::{init_log_logger, validate_log_entry, MultiNodeLogger, TestLogWriter}; use common::{ bump_fee_and_broadcast, distribute_funds_unconfirmed, do_channel_full_cycle, @@ -2501,3 +2501,111 @@ async fn persistence_backwards_compatibility() { node_new.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_fee_bump_rbf() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + + // Fund both nodes + let addr_a = node_a.onchain_payment().new_address().unwrap(); + let addr_b = node_b.onchain_payment().new_address().unwrap(); + + let premine_amount_sat = 500_000; + premine_and_distribute_funds( + &bitcoind.client, + &electrsd.client, + vec![addr_a.clone(), addr_b.clone()], + Amount::from_sat(premine_amount_sat), + ) + .await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Send a transaction from node_b to node_a that we'll later bump + let amount_to_send_sats = 100_000; + let txid = + node_b.onchain_payment().send_to_address(&addr_a, amount_to_send_sats, None).unwrap(); + wait_for_tx(&electrsd.client, txid).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + let payment_id = PaymentId(txid.to_byte_array()); + let original_payment = node_b.payment(&payment_id).unwrap(); + let original_fee = original_payment.fee_paid_msat.unwrap(); + + // Non-existent payment id + let fake_txid = + Txid::from_str("0000000000000000000000000000000000000000000000000000000000000000").unwrap(); + let invalid_payment_id = PaymentId(fake_txid.to_byte_array()); + assert_eq!( + Err(NodeError::InvalidPaymentId), + node_b.onchain_payment().bump_fee_rbf(invalid_payment_id) + ); + + // Bump an inbound payment + assert_eq!(Err(NodeError::InvalidPaymentId), node_a.onchain_payment().bump_fee_rbf(payment_id)); + + // Successful fee bump + let new_txid = node_b.onchain_payment().bump_fee_rbf(payment_id).unwrap(); + wait_for_tx(&electrsd.client, new_txid).await; + + // Sleep to allow for transaction propagation + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify fee increased + let new_payment = node_b.payment(&payment_id).unwrap(); + assert!( + new_payment.fee_paid_msat > Some(original_fee), + "Fee should increase after RBF bump. Original: {}, New: {}", + original_fee, + new_payment.fee_paid_msat.unwrap() + ); + + // Multiple consecutive bumps + let second_bump_txid = node_b.onchain_payment().bump_fee_rbf(payment_id).unwrap(); + wait_for_tx(&electrsd.client, second_bump_txid).await; + + // Sleep to allow for transaction propagation + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + // Verify second bump payment exists + let second_payment = node_b.payment(&payment_id).unwrap(); + assert!( + second_payment.fee_paid_msat > new_payment.fee_paid_msat, + "Second bump should have higher fee than first bump" + ); + + // Confirm the transaction and try to bump again (should fail) + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + node_a.sync_wallets().unwrap(); + node_b.sync_wallets().unwrap(); + + assert_eq!(Err(NodeError::InvalidPaymentId), node_b.onchain_payment().bump_fee_rbf(payment_id)); + + // Verify final payment is confirmed + let final_payment = node_b.payment(&payment_id).unwrap(); + assert_eq!(final_payment.status, PaymentStatus::Succeeded); + match final_payment.kind { + PaymentKind::Onchain { status, .. } => { + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + _ => panic!("Unexpected payment kind"), + } + + // Verify node A received the funds correctly + let node_a_received_payment = node_a.list_payments_with_filter( + |p| matches!(p.kind, PaymentKind::Onchain { txid, .. } if txid == second_bump_txid), + ); + assert_eq!(node_a_received_payment.len(), 1); + assert_eq!(node_a_received_payment[0].amount_msat, Some(amount_to_send_sats * 1000)); + assert_eq!(node_a_received_payment[0].status, PaymentStatus::Succeeded); +}