From ea93afe1c072caf12ba6ad073213023177c5c02d Mon Sep 17 00:00:00 2001 From: Camillarhi Date: Wed, 11 Feb 2026 18:18:29 +0100 Subject: [PATCH] Refactor liquidity source to support multiple LSP nodes Replace per-protocol single-LSP configuration `LSPS1Client, LSPS2Client` with a unified `Vec` model where users configure LSP nodes via `add_lsp()` and protocol support is discovered at runtime via LSPS0 `list_protocols`. - Replace separate `LSPS1Client/LSPS2Client` with global pending request maps keyed by `LSPSRequestId` - Add LSPS0 protocol discovery `discover_lsp_protocols` with event handling for `ListProtocolsResponse` - Update events to use is_lsps_node() for multi-LSP counterparty checks - Deprecate `set_liquidity_source_lsps1/lsps2` builder methods in favor of `add_lsp()` - LSPS2 JIT channels now query all LSPS2-capable LSPs and automatically select the cheapest fee offer across all of them - Add `request_channel_from_lsp()` for explicit LSPS1 LSP selection - Spawn background discovery task on `Node::start()` --- bindings/ldk_node.udl | 3 + src/builder.rs | 80 +- src/event.rs | 28 +- src/lib.rs | 61 +- src/liquidity.rs | 1542 ------------------------------- src/liquidity/client/lsps1.rs | 548 +++++++++++ src/liquidity/client/lsps2.rs | 505 ++++++++++ src/liquidity/client/mod.rs | 11 + src/liquidity/mod.rs | 424 +++++++++ src/liquidity/service/lsps2.rs | 506 ++++++++++ src/liquidity/service/mod.rs | 8 + src/payment/bolt11.rs | 31 +- tests/integration_tests_rust.rs | 6 +- 13 files changed, 2139 insertions(+), 1614 deletions(-) delete mode 100644 src/liquidity.rs create mode 100644 src/liquidity/client/lsps1.rs create mode 100644 src/liquidity/client/lsps2.rs create mode 100644 src/liquidity/client/mod.rs create mode 100644 src/liquidity/mod.rs create mode 100644 src/liquidity/service/lsps2.rs create mode 100644 src/liquidity/service/mod.rs diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 014993690..7da7526d6 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -45,6 +45,7 @@ interface Builder { void set_pathfinding_scores_source(string url); void set_liquidity_source_lsps1(PublicKey node_id, SocketAddress address, string? token); void set_liquidity_source_lsps2(PublicKey node_id, SocketAddress address, string? token); + void add_lsp(PublicKey node_id, SocketAddress address, string? token); void set_storage_dir_path(string storage_dir_path); void set_filesystem_logger(string? log_file_path, LogLevel? max_log_level); void set_log_facade_logger(); @@ -138,6 +139,8 @@ interface Node { boolean verify_signature([ByRef]sequence msg, [ByRef]string sig, [ByRef]PublicKey pkey); [Throws=NodeError] bytes export_pathfinding_scores(); + [Throws=NodeError] + void add_lsp(PublicKey node_id, SocketAddress address, string? token); }; typedef interface Bolt11Payment; diff --git a/src/builder.rs b/src/builder.rs index 806c676b3..345c4fcae 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -65,9 +65,7 @@ use crate::io::{ PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; -use crate::liquidity::{ - LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder, -}; +use crate::liquidity::{LSPS2ServiceConfig, LiquiditySourceBuilder, LspConfig}; use crate::lnurl_auth::LnurlAuth; use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; @@ -120,10 +118,8 @@ struct PathfindingScoresSyncConfig { #[derive(Debug, Clone, Default)] struct LiquiditySourceConfig { - // Act as an LSPS1 client connecting to the given service. - lsps1_client: Option, - // Act as an LSPS2 client connecting to the given service. - lsps2_client: Option, + // Acts for both LSPS1 and LSPS2 clients connecting to the given service. + lsp_nodes: Vec, // Act as an LSPS2 service. lsps2_service: Option, } @@ -440,17 +436,12 @@ impl NodeBuilder { /// The given `token` will be used by the LSP to authenticate the user. /// /// [bLIP-51 / LSPS1]: https://github.com/lightning/blips/blob/master/blip-0051.md + #[deprecated(note = "Use `add_lsp` instead")] + #[allow(dead_code)] pub fn set_liquidity_source_lsps1( &mut self, node_id: PublicKey, address: SocketAddress, token: Option, ) -> &mut Self { - // Mark the LSP as trusted for 0conf - self.config.trusted_peers_0conf.push(node_id.clone()); - - let liquidity_source_config = - self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); - let lsps1_client_config = LSPS1ClientConfig { node_id, address, token }; - liquidity_source_config.lsps1_client = Some(lsps1_client_config); - self + self.add_lsp(node_id, address, token) } /// Configures the [`Node`] instance to source just-in-time inbound liquidity from the given @@ -461,16 +452,32 @@ impl NodeBuilder { /// The given `token` will be used by the LSP to authenticate the user. /// /// [bLIP-52 / LSPS2]: https://github.com/lightning/blips/blob/master/blip-0052.md + #[deprecated(note = "Use `add_lsp` instead")] + #[allow(dead_code)] pub fn set_liquidity_source_lsps2( &mut self, node_id: PublicKey, address: SocketAddress, token: Option, + ) -> &mut Self { + self.add_lsp(node_id, address, token) + } + + /// Configures the [`Node`] instance to source inbound liquidity from the given LSP, without specifying + /// the exact protocol used (e.g., LSPS1 or LSPS2). + /// + /// Will mark the LSP as trusted for 0-confirmation channels, see [`Config::trusted_peers_0conf`]. + /// + /// The given `token` will be used by the LSP to authenticate the user. + /// This method is useful when the user wants to connect to an LSP but does not want to be concerned with + /// the specific protocol used for liquidity provision. The node will automatically detect and use the + /// appropriate protocol supported by the LSP. + pub fn add_lsp( + &mut self, node_id: PublicKey, address: SocketAddress, token: Option, ) -> &mut Self { // Mark the LSP as trusted for 0conf - self.config.trusted_peers_0conf.push(node_id.clone()); + self.config.trusted_peers_0conf.push(node_id); let liquidity_source_config = self.liquidity_source_config.get_or_insert(LiquiditySourceConfig::default()); - let lsps2_client_config = LSPS2ClientConfig { node_id, address, token }; - liquidity_source_config.lsps2_client = Some(lsps2_client_config); + liquidity_source_config.lsp_nodes.push(LspConfig { node_id, address, token }); self } @@ -964,7 +971,7 @@ impl ArcedNodeBuilder { pub fn set_liquidity_source_lsps1( &self, node_id: PublicKey, address: SocketAddress, token: Option, ) { - self.inner.write().unwrap().set_liquidity_source_lsps1(node_id, address, token); + self.inner.write().unwrap().add_lsp(node_id, address, token); } /// Configures the [`Node`] instance to source just-in-time inbound liquidity from the given @@ -978,7 +985,20 @@ impl ArcedNodeBuilder { pub fn set_liquidity_source_lsps2( &self, node_id: PublicKey, address: SocketAddress, token: Option, ) { - self.inner.write().unwrap().set_liquidity_source_lsps2(node_id, address, token); + self.inner.write().unwrap().add_lsp(node_id, address, token); + } + + /// Configures the [`Node`] instance to source inbound liquidity from the given LSP, without specifying + /// the exact protocol used (e.g., LSPS1 or LSPS2). + /// + /// Will mark the LSP as trusted for 0-confirmation channels, see [`Config::trusted_peers_0conf`]. + /// + /// The given `token` will be used by the LSP to authenticate the user. + /// This method is useful when the user wants to connect to an LSP but does not want to be concerned with + /// the specific protocol used for liquidity provision. The node will automatically detect and use the + /// appropriate protocol supported by the LSP. + pub fn add_lsp(&self, node_id: PublicKey, address: SocketAddress, token: Option) { + self.inner.write().unwrap().add_lsp(node_id, address, token); } /// Configures the [`Node`] instance to provide an [LSPS2] service, issuing just-in-time @@ -1802,21 +1822,7 @@ fn build_with_store_internal( Arc::clone(&logger), ); - lsc.lsps1_client.as_ref().map(|config| { - liquidity_source_builder.lsps1_client( - config.node_id, - config.address.clone(), - config.token.clone(), - ) - }); - - lsc.lsps2_client.as_ref().map(|config| { - liquidity_source_builder.lsps2_client( - config.node_id, - config.address.clone(), - config.token.clone(), - ) - }); + liquidity_source_builder.set_lsp_nodes(lsc.lsp_nodes.clone()); let promise_secret = { let lsps_xpriv = derive_xprv( @@ -1885,7 +1891,9 @@ fn build_with_store_internal( } })); - liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::downgrade(&peer_manager))); + liquidity_source + .as_ref() + .map(|l| l.lsps2_service().set_peer_manager(Arc::downgrade(&peer_manager))); let connection_manager = Arc::new(ConnectionManager::new( Arc::clone(&peer_manager), diff --git a/src/event.rs b/src/event.rs index ccee8e50b..3a510b585 100644 --- a/src/event.rs +++ b/src/event.rs @@ -581,7 +581,7 @@ where Ok(final_tx) => { let needs_manual_broadcast = self.liquidity_source.as_ref().map_or(false, |ls| { - ls.as_ref().lsps2_channel_needs_manual_broadcast( + ls.as_ref().lsps2_service().lsps2_channel_needs_manual_broadcast( counterparty_node_id, user_channel_id, ) @@ -589,7 +589,7 @@ where let result = if needs_manual_broadcast { self.liquidity_source.as_ref().map(|ls| { - ls.lsps2_store_funding_transaction( + ls.lsps2_service().lsps2_store_funding_transaction( user_channel_id, counterparty_node_id, final_tx.clone(), @@ -653,7 +653,8 @@ where }, LdkEvent::FundingTxBroadcastSafe { user_channel_id, counterparty_node_id, .. } => { self.liquidity_source.as_ref().map(|ls| { - ls.lsps2_funding_tx_broadcast_safe(user_channel_id, counterparty_node_id); + ls.lsps2_service() + .lsps2_funding_tx_broadcast_safe(user_channel_id, counterparty_node_id); }); }, LdkEvent::PaymentClaimable { @@ -1139,7 +1140,10 @@ where LdkEvent::ProbeFailed { .. } => {}, LdkEvent::HTLCHandlingFailed { failure_type, .. } => { if let Some(liquidity_source) = self.liquidity_source.as_ref() { - liquidity_source.handle_htlc_handling_failed(failure_type).await; + liquidity_source + .lsps2_service() + .handle_htlc_handling_failed(failure_type) + .await; } }, LdkEvent::SpendableOutputs { outputs, channel_id, counterparty_node_id } => { @@ -1238,14 +1242,15 @@ where let user_channel_id: u128 = u128::from_ne_bytes( self.keys_manager.get_secure_random_bytes()[..16].try_into().unwrap(), ); - let allow_0conf = self.config.trusted_peers_0conf.contains(&counterparty_node_id); - let mut channel_override_config = None; - if let Some((lsp_node_id, _)) = self + let is_lsp_node = self .liquidity_source .as_ref() - .and_then(|ls| ls.as_ref().get_lsps2_lsp_details()) - { - if lsp_node_id == counterparty_node_id { + .map_or(false, |ls| ls.as_ref().is_lsps_node(&counterparty_node_id)); + let allow_0conf = + self.config.trusted_peers_0conf.contains(&counterparty_node_id) || is_lsp_node; + let mut channel_override_config = None; + if let Some(ls) = self.liquidity_source.as_ref() { + if ls.as_ref().is_lsps_node(&counterparty_node_id) { // When we're an LSPS2 client, allow claiming underpaying HTLCs as the LSP will skim off some fee. We'll // check that they don't take too much before claiming. // @@ -1390,6 +1395,7 @@ where if let Some(liquidity_source) = self.liquidity_source.as_ref() { let skimmed_fee_msat = skimmed_fee_msat.unwrap_or(0); liquidity_source + .lsps2_service() .handle_payment_forwarded(Some(next_htlc.channel_id), skimmed_fee_msat) .await; } @@ -1499,6 +1505,7 @@ where if let Some(liquidity_source) = self.liquidity_source.as_ref() { liquidity_source + .lsps2_service() .handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id) .await; } @@ -1570,6 +1577,7 @@ where } => { if let Some(liquidity_source) = self.liquidity_source.as_ref() { liquidity_source + .lsps2_service() .handle_htlc_intercepted( requested_next_hop_scid, intercept_id, diff --git a/src/lib.rs b/src/lib.rs index 2e02e996c..501341bb2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -180,6 +180,7 @@ use types::{ pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStore, UserChannelId}; pub use vss_client; +use crate::liquidity::LspConfig; use crate::scoring::setup_background_pathfinding_scores_sync; use crate::wallet::FundingAmount; @@ -674,6 +675,29 @@ impl Node { }); } + if let Some(liquidity_source) = self.liquidity_source.as_ref() { + let discovery_ls = Arc::clone(&liquidity_source); + let discovery_cm = Arc::clone(&self.connection_manager); + let discovery_logger = Arc::clone(&self.logger); + self.runtime.spawn_background_task(async move { + for (node_id, address) in discovery_ls.get_all_lsp_details() { + if let Err(e) = + discovery_cm.connect_peer_if_necessary(node_id, address.clone()).await + { + log_error!( + discovery_logger, + "Failed to connect to LSP {} for protocol discovery: {}", + node_id, + e + ); + continue; + } + } + + discovery_ls.discover_all_lsp_protocols().await; + }); + } + log_info!(self.logger, "Startup complete."); *is_running_lock = true; Ok(()) @@ -1048,7 +1072,7 @@ impl Node { Arc::clone(&self.runtime), Arc::clone(&self.wallet), Arc::clone(&self.connection_manager), - self.liquidity_source.clone(), + self.liquidity_source.as_ref().map(|ls| ls.lsps1_client()), Arc::clone(&self.logger), ) } @@ -1062,7 +1086,7 @@ impl Node { Arc::clone(&self.runtime), Arc::clone(&self.wallet), Arc::clone(&self.connection_manager), - self.liquidity_source.clone(), + self.liquidity_source.as_ref().map(|ls| ls.lsps1_client()), Arc::clone(&self.logger), )) } @@ -1949,6 +1973,39 @@ impl Node { Error::PersistenceFailed }) } + + /// Configures the [`Node`] instance to source inbound liquidity from the given LSP at runtime, + /// without specifying the exact protocol used (e.g., LSPS1 or LSPS2). + /// + /// LSP nodes are automatically trusted for 0-confirmation channels. + /// + /// The given `token` will be used by the LSP to authenticate the user. + /// This method is useful when the user wants to connect to an LSP but does not want to be concerned with + /// the specific protocol used for liquidity provision. The node will automatically detect and use the + /// appropriate protocol supported by the LSP. + pub fn add_lsp( + &self, node_id: PublicKey, address: SocketAddress, token: Option, + ) -> Result<(), Error> { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let lsp_config = LspConfig { node_id, address, token }; + liquidity_source.add_lsp_node(lsp_config.clone())?; + + let con_node_id = lsp_config.node_id; + let con_addr = lsp_config.address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_config.node_id, lsp_config.address); + + let node_id = lsp_config.node_id; + self.runtime + .block_on(async move { liquidity_source.discover_lsp_protocols(&node_id).await })?; + Ok(()) + } } impl Drop for Node { diff --git a/src/liquidity.rs b/src/liquidity.rs deleted file mode 100644 index 485da941c..000000000 --- a/src/liquidity.rs +++ /dev/null @@ -1,1542 +0,0 @@ -// This file is Copyright its original authors, visible in version control history. -// -// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in -// accordance with one or both of these licenses. - -//! Objects related to liquidity management. - -use std::collections::HashMap; -use std::ops::Deref; -use std::sync::{Arc, Mutex, RwLock, Weak}; -use std::time::Duration; - -use bitcoin::secp256k1::{PublicKey, Secp256k1}; -use bitcoin::Transaction; -use chrono::Utc; -use lightning::events::HTLCHandlingFailureType; -use lightning::ln::channelmanager::{InterceptId, MIN_FINAL_CLTV_EXPIRY_DELTA}; -use lightning::ln::msgs::SocketAddress; -use lightning::ln::types::ChannelId; -use lightning::routing::router::{RouteHint, RouteHintHop}; -use lightning::sign::EntropySource; -use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, InvoiceBuilder, RoutingFees}; -use lightning_liquidity::events::LiquidityEvent; -use lightning_liquidity::lsps0::ser::{LSPSDateTime, LSPSRequestId}; -use lightning_liquidity::lsps1::client::LSPS1ClientConfig as LdkLSPS1ClientConfig; -use lightning_liquidity::lsps1::event::LSPS1ClientEvent; -use lightning_liquidity::lsps1::msgs::{ - LSPS1ChannelInfo, LSPS1Options, LSPS1OrderId, LSPS1OrderParams, -}; -use lightning_liquidity::lsps2::client::LSPS2ClientConfig as LdkLSPS2ClientConfig; -use lightning_liquidity::lsps2::event::{LSPS2ClientEvent, LSPS2ServiceEvent}; -use lightning_liquidity::lsps2::msgs::{LSPS2OpeningFeeParams, LSPS2RawOpeningFeeParams}; -use lightning_liquidity::lsps2::service::LSPS2ServiceConfig as LdkLSPS2ServiceConfig; -use lightning_liquidity::lsps2::utils::compute_opening_fee; -use lightning_liquidity::{LiquidityClientConfig, LiquidityServiceConfig}; -use lightning_types::payment::PaymentHash; -use tokio::sync::oneshot; - -use crate::builder::BuildError; -use crate::connection::ConnectionManager; -use crate::logger::{log_debug, log_error, log_info, LdkLogger, Logger}; -use crate::runtime::Runtime; -use crate::types::{ - Broadcaster, ChannelManager, DynStore, KeysManager, LiquidityManager, PeerManager, Wallet, -}; -use crate::{total_anchor_channels_reserve_sats, Config, Error}; - -const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; - -const LSPS2_GETINFO_REQUEST_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24); -const LSPS2_CHANNEL_CLTV_EXPIRY_DELTA: u32 = 72; - -struct LSPS1Client { - lsp_node_id: PublicKey, - lsp_address: SocketAddress, - token: Option, - ldk_client_config: LdkLSPS1ClientConfig, - pending_opening_params_requests: - Mutex>>, - pending_create_order_requests: Mutex>>, - pending_check_order_status_requests: - Mutex>>, -} - -#[derive(Debug, Clone)] -pub(crate) struct LSPS1ClientConfig { - pub node_id: PublicKey, - pub address: SocketAddress, - pub token: Option, -} - -struct LSPS2Client { - lsp_node_id: PublicKey, - lsp_address: SocketAddress, - token: Option, - ldk_client_config: LdkLSPS2ClientConfig, - pending_fee_requests: Mutex>>, - pending_buy_requests: Mutex>>, -} - -#[derive(Debug, Clone)] -pub(crate) struct LSPS2ClientConfig { - pub node_id: PublicKey, - pub address: SocketAddress, - pub token: Option, -} - -struct LSPS2Service { - service_config: LSPS2ServiceConfig, - ldk_service_config: LdkLSPS2ServiceConfig, -} - -/// Represents the configuration of the LSPS2 service. -/// -/// See [bLIP-52 / LSPS2] for more information. -/// -/// [bLIP-52 / LSPS2]: https://github.com/lightning/blips/blob/master/blip-0052.md -#[derive(Debug, Clone)] -#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] -pub struct LSPS2ServiceConfig { - /// A token we may require to be sent by the clients. - /// - /// If set, only requests matching this token will be accepted. - pub require_token: Option, - /// Indicates whether the LSPS service will be announced via the gossip network. - pub advertise_service: bool, - /// The fee we withhold for the channel open from the initial payment. - /// - /// This fee is proportional to the client-requested amount, in parts-per-million. - pub channel_opening_fee_ppm: u32, - /// The proportional overprovisioning for the channel. - /// - /// This determines, in parts-per-million, how much value we'll provision on top of the amount - /// we need to forward the payment to the client. - /// - /// For example, setting this to `100_000` will result in a channel being opened that is 10% - /// larger than then the to-be-forwarded amount (i.e., client-requested amount minus the - /// channel opening fee fee). - pub channel_over_provisioning_ppm: u32, - /// The minimum fee required for opening a channel. - pub min_channel_opening_fee_msat: u64, - /// The minimum number of blocks after confirmation we promise to keep the channel open. - pub min_channel_lifetime: u32, - /// The maximum number of blocks that the client is allowed to set its `to_self_delay` parameter. - pub max_client_to_self_delay: u32, - /// The minimum payment size that we will accept when opening a channel. - pub min_payment_size_msat: u64, - /// The maximum payment size that we will accept when opening a channel. - pub max_payment_size_msat: u64, - /// Use the 'client-trusts-LSP' trust model. - /// - /// When set, the service will delay *broadcasting* the JIT channel's funding transaction until - /// the client claimed sufficient HTLC parts to pay for the channel open. - /// - /// Note this will render the flow incompatible with clients utilizing the 'LSP-trust-client' - /// trust model, i.e., in turn delay *claiming* any HTLCs until they see the funding - /// transaction in the mempool. - /// - /// Please refer to [`bLIP-52`] for more information. - /// - /// [`bLIP-52`]: https://github.com/lightning/blips/blob/master/blip-0052.md#trust-models - pub client_trusts_lsp: bool, -} - -pub(crate) struct LiquiditySourceBuilder -where - L::Target: LdkLogger, -{ - lsps1_client: Option, - lsps2_client: Option, - lsps2_service: Option, - wallet: Arc, - channel_manager: Arc, - keys_manager: Arc, - tx_broadcaster: Arc, - kv_store: Arc, - config: Arc, - logger: L, -} - -impl LiquiditySourceBuilder -where - L::Target: LdkLogger, -{ - pub(crate) fn new( - wallet: Arc, channel_manager: Arc, keys_manager: Arc, - tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: L, - ) -> Self { - let lsps1_client = None; - let lsps2_client = None; - let lsps2_service = None; - Self { - lsps1_client, - lsps2_client, - lsps2_service, - wallet, - channel_manager, - keys_manager, - tx_broadcaster, - kv_store, - config, - logger, - } - } - - pub(crate) fn lsps1_client( - &mut self, lsp_node_id: PublicKey, lsp_address: SocketAddress, token: Option, - ) -> &mut Self { - // TODO: allow to set max_channel_fees_msat - let ldk_client_config = LdkLSPS1ClientConfig { max_channel_fees_msat: None }; - let pending_opening_params_requests = Mutex::new(HashMap::new()); - let pending_create_order_requests = Mutex::new(HashMap::new()); - let pending_check_order_status_requests = Mutex::new(HashMap::new()); - self.lsps1_client = Some(LSPS1Client { - lsp_node_id, - lsp_address, - token, - ldk_client_config, - pending_opening_params_requests, - pending_create_order_requests, - pending_check_order_status_requests, - }); - self - } - - pub(crate) fn lsps2_client( - &mut self, lsp_node_id: PublicKey, lsp_address: SocketAddress, token: Option, - ) -> &mut Self { - let ldk_client_config = LdkLSPS2ClientConfig {}; - let pending_fee_requests = Mutex::new(HashMap::new()); - let pending_buy_requests = Mutex::new(HashMap::new()); - self.lsps2_client = Some(LSPS2Client { - lsp_node_id, - lsp_address, - token, - ldk_client_config, - pending_fee_requests, - pending_buy_requests, - }); - self - } - - pub(crate) fn lsps2_service( - &mut self, promise_secret: [u8; 32], service_config: LSPS2ServiceConfig, - ) -> &mut Self { - let ldk_service_config = LdkLSPS2ServiceConfig { promise_secret }; - self.lsps2_service = Some(LSPS2Service { service_config, ldk_service_config }); - self - } - - pub(crate) async fn build(self) -> Result, BuildError> { - let liquidity_service_config = self.lsps2_service.as_ref().map(|s| { - let lsps2_service_config = Some(s.ldk_service_config.clone()); - let lsps5_service_config = None; - let advertise_service = s.service_config.advertise_service; - LiquidityServiceConfig { - lsps1_service_config: None, - lsps2_service_config, - lsps5_service_config, - advertise_service, - } - }); - - let lsps1_client_config = self.lsps1_client.as_ref().map(|s| s.ldk_client_config.clone()); - let lsps2_client_config = self.lsps2_client.as_ref().map(|s| s.ldk_client_config.clone()); - let lsps5_client_config = None; - let liquidity_client_config = Some(LiquidityClientConfig { - lsps1_client_config, - lsps2_client_config, - lsps5_client_config, - }); - - let liquidity_manager = Arc::new( - LiquidityManager::new( - Arc::clone(&self.keys_manager), - Arc::clone(&self.keys_manager), - Arc::clone(&self.channel_manager), - Arc::clone(&self.kv_store), - Arc::clone(&self.tx_broadcaster), - liquidity_service_config, - liquidity_client_config, - ) - .await - .map_err(|_| BuildError::ReadFailed)?, - ); - - Ok(LiquiditySource { - lsps1_client: self.lsps1_client, - lsps2_client: self.lsps2_client, - lsps2_service: self.lsps2_service, - wallet: self.wallet, - channel_manager: self.channel_manager, - peer_manager: RwLock::new(None), - keys_manager: self.keys_manager, - liquidity_manager, - config: self.config, - logger: self.logger, - }) - } -} - -pub(crate) struct LiquiditySource -where - L::Target: LdkLogger, -{ - lsps1_client: Option, - lsps2_client: Option, - lsps2_service: Option, - wallet: Arc, - channel_manager: Arc, - peer_manager: RwLock>>, - keys_manager: Arc, - liquidity_manager: Arc, - config: Arc, - logger: L, -} - -impl LiquiditySource -where - L::Target: LdkLogger, -{ - pub(crate) fn set_peer_manager(&self, peer_manager: Weak) { - *self.peer_manager.write().unwrap() = Some(peer_manager); - } - - pub(crate) fn liquidity_manager(&self) -> Arc { - Arc::clone(&self.liquidity_manager) - } - - pub(crate) fn get_lsps1_lsp_details(&self) -> Option<(PublicKey, SocketAddress)> { - self.lsps1_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone())) - } - - pub(crate) fn get_lsps2_lsp_details(&self) -> Option<(PublicKey, SocketAddress)> { - self.lsps2_client.as_ref().map(|s| (s.lsp_node_id, s.lsp_address.clone())) - } - - pub(crate) fn lsps2_channel_needs_manual_broadcast( - &self, counterparty_node_id: PublicKey, user_channel_id: u128, - ) -> bool { - self.lsps2_service.as_ref().map_or(false, |lsps2_service| { - lsps2_service.service_config.client_trusts_lsp - && self - .liquidity_manager() - .lsps2_service_handler() - .and_then(|handler| { - handler - .channel_needs_manual_broadcast(user_channel_id, &counterparty_node_id) - .ok() - }) - .unwrap_or(false) - }) - } - - pub(crate) fn lsps2_store_funding_transaction( - &self, user_channel_id: u128, counterparty_node_id: PublicKey, funding_tx: Transaction, - ) { - if self.lsps2_service.as_ref().map_or(false, |svc| !svc.service_config.client_trusts_lsp) { - // Only necessary for client-trusts-LSP flow - return; - } - - let lsps2_service_handler = self.liquidity_manager.lsps2_service_handler(); - if let Some(handler) = lsps2_service_handler { - handler - .store_funding_transaction(user_channel_id, &counterparty_node_id, funding_tx) - .unwrap_or_else(|e| { - debug_assert!(false, "Failed to store funding transaction: {:?}", e); - log_error!(self.logger, "Failed to store funding transaction: {:?}", e); - }); - } else { - log_error!(self.logger, "LSPS2 service handler is not available."); - } - } - - pub(crate) fn lsps2_funding_tx_broadcast_safe( - &self, user_channel_id: u128, counterparty_node_id: PublicKey, - ) { - if self.lsps2_service.as_ref().map_or(false, |svc| !svc.service_config.client_trusts_lsp) { - // Only necessary for client-trusts-LSP flow - return; - } - - let lsps2_service_handler = self.liquidity_manager.lsps2_service_handler(); - if let Some(handler) = lsps2_service_handler { - handler - .set_funding_tx_broadcast_safe(user_channel_id, &counterparty_node_id) - .unwrap_or_else(|e| { - debug_assert!( - false, - "Failed to mark funding transaction safe to broadcast: {:?}", - e - ); - log_error!( - self.logger, - "Failed to mark funding transaction safe to broadcast: {:?}", - e - ); - }); - } else { - log_error!(self.logger, "LSPS2 service handler is not available."); - } - } - - pub(crate) async fn handle_next_event(&self) { - match self.liquidity_manager.next_event_async().await { - LiquidityEvent::LSPS1Client(LSPS1ClientEvent::SupportedOptionsReady { - request_id, - counterparty_node_id, - supported_options, - }) => { - if let Some(lsps1_client) = self.lsps1_client.as_ref() { - if counterparty_node_id != lsps1_client.lsp_node_id { - debug_assert!( - false, - "Received response from unexpected LSP counterparty. This should never happen." - ); - log_error!( - self.logger, - "Received response from unexpected LSP counterparty. This should never happen." - ); - return; - } - - if let Some(sender) = lsps1_client - .pending_opening_params_requests - .lock() - .unwrap() - .remove(&request_id) - { - let response = LSPS1OpeningParamsResponse { supported_options }; - - match sender.send(response) { - Ok(()) => (), - Err(_) => { - log_error!( - self.logger, - "Failed to handle response for request {:?} from liquidity service", - request_id - ); - }, - } - } else { - debug_assert!( - false, - "Received response from liquidity service for unknown request." - ); - log_error!( - self.logger, - "Received response from liquidity service for unknown request." - ); - } - } else { - log_error!( - self.logger, - "Received unexpected LSPS1Client::SupportedOptionsReady event!" - ); - } - }, - LiquidityEvent::LSPS1Client(LSPS1ClientEvent::OrderCreated { - request_id, - counterparty_node_id, - order_id, - order, - payment, - channel, - }) => { - if let Some(lsps1_client) = self.lsps1_client.as_ref() { - if counterparty_node_id != lsps1_client.lsp_node_id { - debug_assert!( - false, - "Received response from unexpected LSP counterparty. This should never happen." - ); - log_error!( - self.logger, - "Received response from unexpected LSP counterparty. This should never happen." - ); - return; - } - - if let Some(sender) = lsps1_client - .pending_create_order_requests - .lock() - .unwrap() - .remove(&request_id) - { - let response = LSPS1OrderStatus { - order_id, - order_params: order, - payment_options: payment.into(), - channel_state: channel, - }; - - match sender.send(response) { - Ok(()) => (), - Err(_) => { - log_error!( - self.logger, - "Failed to handle response for request {:?} from liquidity service", - request_id - ); - }, - } - } else { - debug_assert!( - false, - "Received response from liquidity service for unknown request." - ); - log_error!( - self.logger, - "Received response from liquidity service for unknown request." - ); - } - } else { - log_error!(self.logger, "Received unexpected LSPS1Client::OrderCreated event!"); - } - }, - LiquidityEvent::LSPS1Client(LSPS1ClientEvent::OrderStatus { - request_id, - counterparty_node_id, - order_id, - order, - payment, - channel, - }) => { - if let Some(lsps1_client) = self.lsps1_client.as_ref() { - if counterparty_node_id != lsps1_client.lsp_node_id { - debug_assert!( - false, - "Received response from unexpected LSP counterparty. This should never happen." - ); - log_error!( - self.logger, - "Received response from unexpected LSP counterparty. This should never happen." - ); - return; - } - - if let Some(sender) = lsps1_client - .pending_check_order_status_requests - .lock() - .unwrap() - .remove(&request_id) - { - let response = LSPS1OrderStatus { - order_id, - order_params: order, - payment_options: payment.into(), - channel_state: channel, - }; - - match sender.send(response) { - Ok(()) => (), - Err(_) => { - log_error!( - self.logger, - "Failed to handle response for request {:?} from liquidity service", - request_id - ); - }, - } - } else { - debug_assert!( - false, - "Received response from liquidity service for unknown request." - ); - log_error!( - self.logger, - "Received response from liquidity service for unknown request." - ); - } - } else { - log_error!(self.logger, "Received unexpected LSPS1Client::OrderStatus event!"); - } - }, - LiquidityEvent::LSPS2Service(LSPS2ServiceEvent::GetInfo { - request_id, - counterparty_node_id, - token, - }) => { - if let Some(lsps2_service_handler) = - self.liquidity_manager.lsps2_service_handler().as_ref() - { - let service_config = if let Some(service_config) = - self.lsps2_service.as_ref().map(|s| s.service_config.clone()) - { - service_config - } else { - log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",); - return; - }; - - if let Some(required) = service_config.require_token { - if token != Some(required) { - log_error!( - self.logger, - "Rejecting LSPS2 request {:?} from counterparty {} as the client provided an invalid token.", - request_id, - counterparty_node_id - ); - lsps2_service_handler.invalid_token_provided(&counterparty_node_id, request_id.clone()).unwrap_or_else(|e| { - debug_assert!(false, "Failed to reject LSPS2 request. This should never happen."); - log_error!( - self.logger, - "Failed to reject LSPS2 request {:?} from counterparty {} due to: {:?}. This should never happen.", - request_id, - counterparty_node_id, - e - ); - }); - return; - } - } - - let valid_until = LSPSDateTime(Utc::now() + LSPS2_GETINFO_REQUEST_EXPIRY); - let opening_fee_params = LSPS2RawOpeningFeeParams { - min_fee_msat: service_config.min_channel_opening_fee_msat, - proportional: service_config.channel_opening_fee_ppm, - valid_until, - min_lifetime: service_config.min_channel_lifetime, - max_client_to_self_delay: service_config.max_client_to_self_delay, - min_payment_size_msat: service_config.min_payment_size_msat, - max_payment_size_msat: service_config.max_payment_size_msat, - }; - - let opening_fee_params_menu = vec![opening_fee_params]; - - if let Err(e) = lsps2_service_handler.opening_fee_params_generated( - &counterparty_node_id, - request_id, - opening_fee_params_menu, - ) { - log_error!( - self.logger, - "Failed to handle generated opening fee params: {:?}", - e - ); - } - } else { - log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",); - return; - } - }, - LiquidityEvent::LSPS2Service(LSPS2ServiceEvent::BuyRequest { - request_id, - counterparty_node_id, - opening_fee_params: _, - payment_size_msat, - }) => { - if let Some(lsps2_service_handler) = - self.liquidity_manager.lsps2_service_handler().as_ref() - { - let service_config = if let Some(service_config) = - self.lsps2_service.as_ref().map(|s| s.service_config.clone()) - { - service_config - } else { - log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",); - return; - }; - - let user_channel_id: u128 = u128::from_ne_bytes( - self.keys_manager.get_secure_random_bytes()[..16].try_into().unwrap(), - ); - let intercept_scid = self.channel_manager.get_intercept_scid(); - - if let Some(payment_size_msat) = payment_size_msat { - // We already check this in `lightning-liquidity`, but better safe than - // sorry. - // - // TODO: We might want to eventually send back an error here, but we - // currently can't and have to trust `lightning-liquidity` is doing the - // right thing. - // - // TODO: Eventually we also might want to make sure that we have sufficient - // liquidity for the channel opening here. - if payment_size_msat > service_config.max_payment_size_msat - || payment_size_msat < service_config.min_payment_size_msat - { - log_error!( - self.logger, - "Rejecting to handle LSPS2 buy request {:?} from counterparty {} as the client requested an invalid payment size.", - request_id, - counterparty_node_id - ); - return; - } - } - - match lsps2_service_handler - .invoice_parameters_generated( - &counterparty_node_id, - request_id, - intercept_scid, - LSPS2_CHANNEL_CLTV_EXPIRY_DELTA, - service_config.client_trusts_lsp, - user_channel_id, - ) - .await - { - Ok(()) => {}, - Err(e) => { - log_error!( - self.logger, - "Failed to provide invoice parameters: {:?}", - e - ); - return; - }, - } - } else { - log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",); - return; - } - }, - LiquidityEvent::LSPS2Service(LSPS2ServiceEvent::OpenChannel { - their_network_key, - amt_to_forward_msat, - opening_fee_msat: _, - user_channel_id, - intercept_scid: _, - }) => { - if self.liquidity_manager.lsps2_service_handler().is_none() { - log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",); - return; - }; - - let service_config = if let Some(service_config) = - self.lsps2_service.as_ref().map(|s| s.service_config.clone()) - { - service_config - } else { - log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",); - return; - }; - - let init_features = if let Some(Some(peer_manager)) = - self.peer_manager.read().unwrap().as_ref().map(|weak| weak.upgrade()) - { - // Fail if we're not connected to the prospective channel partner. - if let Some(peer) = peer_manager.peer_by_node_id(&their_network_key) { - peer.init_features - } else { - // TODO: We just silently fail here. Eventually we will need to remember - // the pending requests and regularly retry opening the channel until we - // succeed. - log_error!( - self.logger, - "Failed to open LSPS2 channel to {} due to peer not being not connected.", - their_network_key, - ); - return; - } - } else { - debug_assert!(false, "Failed to handle LSPS2ServiceEvent as peer manager isn't available. This should never happen.",); - log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as peer manager isn't available. This should never happen.",); - return; - }; - - // Fail if we have insufficient onchain funds available. - let over_provisioning_msat = (amt_to_forward_msat - * service_config.channel_over_provisioning_ppm as u64) - / 1_000_000; - let channel_amount_sats = (amt_to_forward_msat + over_provisioning_msat) / 1000; - let cur_anchor_reserve_sats = - total_anchor_channels_reserve_sats(&self.channel_manager, &self.config); - let spendable_amount_sats = - self.wallet.get_spendable_amount_sats(cur_anchor_reserve_sats).unwrap_or(0); - let required_funds_sats = channel_amount_sats - + self.config.anchor_channels_config.as_ref().map_or(0, |c| { - if init_features.requires_anchors_zero_fee_htlc_tx() - && !c.trusted_peers_no_reserve.contains(&their_network_key) - { - c.per_channel_reserve_sats - } else { - 0 - } - }); - if spendable_amount_sats < required_funds_sats { - log_error!(self.logger, - "Unable to create channel due to insufficient funds. Available: {}sats, Required: {}sats", - spendable_amount_sats, channel_amount_sats - ); - // TODO: We just silently fail here. Eventually we will need to remember - // the pending requests and regularly retry opening the channel until we - // succeed. - return; - } - - let mut config = self.channel_manager.get_current_config().clone(); - - // We set these LSP-specific values during Node building, here we're making sure it's actually set. - debug_assert_eq!( - config - .channel_handshake_config - .max_inbound_htlc_value_in_flight_percent_of_channel, - 100 - ); - debug_assert!(config.accept_forwards_to_priv_channels); - - // We set the forwarding fee to 0 for now as we're getting paid by the channel fee. - // - // TODO: revisit this decision eventually. - config.channel_config.forwarding_fee_base_msat = 0; - config.channel_config.forwarding_fee_proportional_millionths = 0; - - match self.channel_manager.create_channel( - their_network_key, - channel_amount_sats, - 0, - user_channel_id, - None, - Some(config), - ) { - Ok(_) => {}, - Err(e) => { - // TODO: We just silently fail here. Eventually we will need to remember - // the pending requests and regularly retry opening the channel until we - // succeed. - log_error!( - self.logger, - "Failed to open LSPS2 channel to {}: {:?}", - their_network_key, - e - ); - return; - }, - } - }, - LiquidityEvent::LSPS2Client(LSPS2ClientEvent::OpeningParametersReady { - request_id, - counterparty_node_id, - opening_fee_params_menu, - }) => { - if let Some(lsps2_client) = self.lsps2_client.as_ref() { - if counterparty_node_id != lsps2_client.lsp_node_id { - debug_assert!( - false, - "Received response from unexpected LSP counterparty. This should never happen." - ); - log_error!( - self.logger, - "Received response from unexpected LSP counterparty. This should never happen." - ); - return; - } - - if let Some(sender) = - lsps2_client.pending_fee_requests.lock().unwrap().remove(&request_id) - { - let response = LSPS2FeeResponse { opening_fee_params_menu }; - - match sender.send(response) { - Ok(()) => (), - Err(_) => { - log_error!( - self.logger, - "Failed to handle response for request {:?} from liquidity service", - request_id - ); - }, - } - } else { - debug_assert!( - false, - "Received response from liquidity service for unknown request." - ); - log_error!( - self.logger, - "Received response from liquidity service for unknown request." - ); - } - } else { - log_error!( - self.logger, - "Received unexpected LSPS2Client::OpeningParametersReady event!" - ); - } - }, - LiquidityEvent::LSPS2Client(LSPS2ClientEvent::InvoiceParametersReady { - request_id, - counterparty_node_id, - intercept_scid, - cltv_expiry_delta, - .. - }) => { - if let Some(lsps2_client) = self.lsps2_client.as_ref() { - if counterparty_node_id != lsps2_client.lsp_node_id { - debug_assert!( - false, - "Received response from unexpected LSP counterparty. This should never happen." - ); - log_error!( - self.logger, - "Received response from unexpected LSP counterparty. This should never happen." - ); - return; - } - - if let Some(sender) = - lsps2_client.pending_buy_requests.lock().unwrap().remove(&request_id) - { - let response = LSPS2BuyResponse { intercept_scid, cltv_expiry_delta }; - - match sender.send(response) { - Ok(()) => (), - Err(_) => { - log_error!( - self.logger, - "Failed to handle response for request {:?} from liquidity service", - request_id - ); - }, - } - } else { - debug_assert!( - false, - "Received response from liquidity service for unknown request." - ); - log_error!( - self.logger, - "Received response from liquidity service for unknown request." - ); - } - } else { - log_error!( - self.logger, - "Received unexpected LSPS2Client::InvoiceParametersReady event!" - ); - } - }, - e => { - log_error!(self.logger, "Received unexpected liquidity event: {:?}", e); - }, - } - } - - pub(crate) async fn lsps1_request_opening_params( - &self, - ) -> Result { - let lsps1_client = self.lsps1_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; - - let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| { - log_error!(self.logger, "LSPS1 liquidity client was not configured.",); - Error::LiquiditySourceUnavailable - })?; - - let (request_sender, request_receiver) = oneshot::channel(); - { - let mut pending_opening_params_requests_lock = - lsps1_client.pending_opening_params_requests.lock().unwrap(); - let request_id = client_handler.request_supported_options(lsps1_client.lsp_node_id); - pending_opening_params_requests_lock.insert(request_id, request_sender); - } - - tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), request_receiver) - .await - .map_err(|e| { - log_error!(self.logger, "Liquidity request timed out: {}", e); - Error::LiquidityRequestFailed - })? - .map_err(|e| { - log_error!(self.logger, "Failed to handle response from liquidity service: {}", e); - Error::LiquidityRequestFailed - }) - } - - pub(crate) async fn lsps1_request_channel( - &self, lsp_balance_sat: u64, client_balance_sat: u64, channel_expiry_blocks: u32, - announce_channel: bool, refund_address: bitcoin::Address, - ) -> Result { - let lsps1_client = self.lsps1_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; - let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| { - log_error!(self.logger, "LSPS1 liquidity client was not configured.",); - Error::LiquiditySourceUnavailable - })?; - - let lsp_limits = self.lsps1_request_opening_params().await?.supported_options; - let channel_size_sat = lsp_balance_sat + client_balance_sat; - - if channel_size_sat < lsp_limits.min_channel_balance_sat - || channel_size_sat > lsp_limits.max_channel_balance_sat - { - log_error!( - self.logger, - "Requested channel size of {}sat doesn't meet the LSP-provided limits (min: {}sat, max: {}sat).", - channel_size_sat, - lsp_limits.min_channel_balance_sat, - lsp_limits.max_channel_balance_sat - ); - return Err(Error::LiquidityRequestFailed); - } - - if lsp_balance_sat < lsp_limits.min_initial_lsp_balance_sat - || lsp_balance_sat > lsp_limits.max_initial_lsp_balance_sat - { - log_error!( - self.logger, - "Requested LSP-side balance of {}sat doesn't meet the LSP-provided limits (min: {}sat, max: {}sat).", - lsp_balance_sat, - lsp_limits.min_initial_lsp_balance_sat, - lsp_limits.max_initial_lsp_balance_sat - ); - return Err(Error::LiquidityRequestFailed); - } - - if client_balance_sat < lsp_limits.min_initial_client_balance_sat - || client_balance_sat > lsp_limits.max_initial_client_balance_sat - { - log_error!( - self.logger, - "Requested client-side balance of {}sat doesn't meet the LSP-provided limits (min: {}sat, max: {}sat).", - client_balance_sat, - lsp_limits.min_initial_client_balance_sat, - lsp_limits.max_initial_client_balance_sat - ); - return Err(Error::LiquidityRequestFailed); - } - - let order_params = LSPS1OrderParams { - lsp_balance_sat, - client_balance_sat, - required_channel_confirmations: lsp_limits.min_required_channel_confirmations, - funding_confirms_within_blocks: lsp_limits.min_funding_confirms_within_blocks, - channel_expiry_blocks, - token: lsps1_client.token.clone(), - announce_channel, - }; - - let (request_sender, request_receiver) = oneshot::channel(); - let request_id; - { - let mut pending_create_order_requests_lock = - lsps1_client.pending_create_order_requests.lock().unwrap(); - request_id = client_handler.create_order( - &lsps1_client.lsp_node_id, - order_params.clone(), - Some(refund_address), - ); - pending_create_order_requests_lock.insert(request_id.clone(), request_sender); - } - - let response = tokio::time::timeout( - Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), - request_receiver, - ) - .await - .map_err(|e| { - log_error!(self.logger, "Liquidity request with ID {:?} timed out: {}", request_id, e); - Error::LiquidityRequestFailed - })? - .map_err(|e| { - log_error!(self.logger, "Failed to handle response from liquidity service: {}", e); - Error::LiquidityRequestFailed - })?; - - if response.order_params != order_params { - log_error!( - self.logger, - "Aborting LSPS1 request as LSP-provided parameters don't match our order. Expected: {:?}, Received: {:?}", order_params, response.order_params - ); - return Err(Error::LiquidityRequestFailed); - } - - Ok(response) - } - - pub(crate) async fn lsps1_check_order_status( - &self, order_id: LSPS1OrderId, - ) -> Result { - let lsps1_client = self.lsps1_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; - let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| { - log_error!(self.logger, "LSPS1 liquidity client was not configured.",); - Error::LiquiditySourceUnavailable - })?; - - let (request_sender, request_receiver) = oneshot::channel(); - { - let mut pending_check_order_status_requests_lock = - lsps1_client.pending_check_order_status_requests.lock().unwrap(); - let request_id = client_handler.check_order_status(&lsps1_client.lsp_node_id, order_id); - pending_check_order_status_requests_lock.insert(request_id, request_sender); - } - - let response = tokio::time::timeout( - Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), - request_receiver, - ) - .await - .map_err(|e| { - log_error!(self.logger, "Liquidity request timed out: {}", e); - Error::LiquidityRequestFailed - })? - .map_err(|e| { - log_error!(self.logger, "Failed to handle response from liquidity service: {}", e); - Error::LiquidityRequestFailed - })?; - - Ok(response) - } - - pub(crate) async fn lsps2_receive_to_jit_channel( - &self, amount_msat: u64, description: &Bolt11InvoiceDescription, expiry_secs: u32, - max_total_lsp_fee_limit_msat: Option, payment_hash: Option, - ) -> Result<(Bolt11Invoice, u64), Error> { - let fee_response = self.lsps2_request_opening_fee_params().await?; - - let (min_total_fee_msat, min_opening_params) = fee_response - .opening_fee_params_menu - .into_iter() - .filter_map(|params| { - if amount_msat < params.min_payment_size_msat - || amount_msat > params.max_payment_size_msat - { - log_debug!(self.logger, - "Skipping LSP-offered JIT parameters as the payment of {}msat doesn't meet LSP limits (min: {}msat, max: {}msat)", - amount_msat, - params.min_payment_size_msat, - params.max_payment_size_msat - ); - None - } else { - compute_opening_fee(amount_msat, params.min_fee_msat, params.proportional as u64) - .map(|fee| (fee, params)) - } - }) - .min_by_key(|p| p.0) - .ok_or_else(|| { - log_error!(self.logger, "Failed to handle response from liquidity service",); - Error::LiquidityRequestFailed - })?; - - if let Some(max_total_lsp_fee_limit_msat) = max_total_lsp_fee_limit_msat { - if min_total_fee_msat > max_total_lsp_fee_limit_msat { - log_error!(self.logger, - "Failed to request inbound JIT channel as LSP's requested total opening fee of {}msat exceeds our fee limit of {}msat", - min_total_fee_msat, max_total_lsp_fee_limit_msat - ); - return Err(Error::LiquidityFeeTooHigh); - } - } - - log_debug!( - self.logger, - "Choosing cheapest liquidity offer, will pay {}msat in total LSP fees", - min_total_fee_msat - ); - - let buy_response = - self.lsps2_send_buy_request(Some(amount_msat), min_opening_params).await?; - let invoice = self.lsps2_create_jit_invoice( - buy_response, - Some(amount_msat), - description, - expiry_secs, - payment_hash, - )?; - - log_info!(self.logger, "JIT-channel invoice created: {}", invoice); - Ok((invoice, min_total_fee_msat)) - } - - pub(crate) async fn lsps2_receive_variable_amount_to_jit_channel( - &self, description: &Bolt11InvoiceDescription, expiry_secs: u32, - max_proportional_lsp_fee_limit_ppm_msat: Option, payment_hash: Option, - ) -> Result<(Bolt11Invoice, u64), Error> { - let fee_response = self.lsps2_request_opening_fee_params().await?; - - let (min_prop_fee_ppm_msat, min_opening_params) = fee_response - .opening_fee_params_menu - .into_iter() - .map(|params| (params.proportional as u64, params)) - .min_by_key(|p| p.0) - .ok_or_else(|| { - log_error!(self.logger, "Failed to handle response from liquidity service",); - Error::LiquidityRequestFailed - })?; - - if let Some(max_proportional_lsp_fee_limit_ppm_msat) = - max_proportional_lsp_fee_limit_ppm_msat - { - if min_prop_fee_ppm_msat > max_proportional_lsp_fee_limit_ppm_msat { - log_error!(self.logger, - "Failed to request inbound JIT channel as LSP's requested proportional opening fee of {} ppm msat exceeds our fee limit of {} ppm msat", - min_prop_fee_ppm_msat, - max_proportional_lsp_fee_limit_ppm_msat - ); - return Err(Error::LiquidityFeeTooHigh); - } - } - - log_debug!( - self.logger, - "Choosing cheapest liquidity offer, will pay {}ppm msat in proportional LSP fees", - min_prop_fee_ppm_msat - ); - - let buy_response = self.lsps2_send_buy_request(None, min_opening_params).await?; - let invoice = self.lsps2_create_jit_invoice( - buy_response, - None, - description, - expiry_secs, - payment_hash, - )?; - - log_info!(self.logger, "JIT-channel invoice created: {}", invoice); - Ok((invoice, min_prop_fee_ppm_msat)) - } - - async fn lsps2_request_opening_fee_params(&self) -> Result { - let lsps2_client = self.lsps2_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; - - let client_handler = self.liquidity_manager.lsps2_client_handler().ok_or_else(|| { - log_error!(self.logger, "Liquidity client was not configured.",); - Error::LiquiditySourceUnavailable - })?; - - let (fee_request_sender, fee_request_receiver) = oneshot::channel(); - { - let mut pending_fee_requests_lock = lsps2_client.pending_fee_requests.lock().unwrap(); - let request_id = client_handler - .request_opening_params(lsps2_client.lsp_node_id, lsps2_client.token.clone()); - pending_fee_requests_lock.insert(request_id, fee_request_sender); - } - - tokio::time::timeout( - Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), - fee_request_receiver, - ) - .await - .map_err(|e| { - log_error!(self.logger, "Liquidity request timed out: {}", e); - Error::LiquidityRequestFailed - })? - .map_err(|e| { - log_error!(self.logger, "Failed to handle response from liquidity service: {}", e); - Error::LiquidityRequestFailed - }) - } - - async fn lsps2_send_buy_request( - &self, amount_msat: Option, opening_fee_params: LSPS2OpeningFeeParams, - ) -> Result { - let lsps2_client = self.lsps2_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; - - let client_handler = self.liquidity_manager.lsps2_client_handler().ok_or_else(|| { - log_error!(self.logger, "Liquidity client was not configured.",); - Error::LiquiditySourceUnavailable - })?; - - let (buy_request_sender, buy_request_receiver) = oneshot::channel(); - { - let mut pending_buy_requests_lock = lsps2_client.pending_buy_requests.lock().unwrap(); - let request_id = client_handler - .select_opening_params(lsps2_client.lsp_node_id, amount_msat, opening_fee_params) - .map_err(|e| { - log_error!( - self.logger, - "Failed to send buy request to liquidity service: {:?}", - e - ); - Error::LiquidityRequestFailed - })?; - pending_buy_requests_lock.insert(request_id, buy_request_sender); - } - - let buy_response = tokio::time::timeout( - Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), - buy_request_receiver, - ) - .await - .map_err(|e| { - log_error!(self.logger, "Liquidity request timed out: {}", e); - Error::LiquidityRequestFailed - })? - .map_err(|e| { - log_error!(self.logger, "Failed to handle response from liquidity service: {:?}", e); - Error::LiquidityRequestFailed - })?; - - Ok(buy_response) - } - - fn lsps2_create_jit_invoice( - &self, buy_response: LSPS2BuyResponse, amount_msat: Option, - description: &Bolt11InvoiceDescription, expiry_secs: u32, - payment_hash: Option, - ) -> Result { - let lsps2_client = self.lsps2_client.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; - - // LSPS2 requires min_final_cltv_expiry_delta to be at least 2 more than usual. - let min_final_cltv_expiry_delta = MIN_FINAL_CLTV_EXPIRY_DELTA + 2; - let (payment_hash, payment_secret) = match payment_hash { - Some(payment_hash) => { - let payment_secret = self - .channel_manager - .create_inbound_payment_for_hash( - payment_hash, - None, - expiry_secs, - Some(min_final_cltv_expiry_delta), - ) - .map_err(|e| { - log_error!(self.logger, "Failed to register inbound payment: {:?}", e); - Error::InvoiceCreationFailed - })?; - (payment_hash, payment_secret) - }, - None => self - .channel_manager - .create_inbound_payment(None, expiry_secs, Some(min_final_cltv_expiry_delta)) - .map_err(|e| { - log_error!(self.logger, "Failed to register inbound payment: {:?}", e); - Error::InvoiceCreationFailed - })?, - }; - - let route_hint = RouteHint(vec![RouteHintHop { - src_node_id: lsps2_client.lsp_node_id, - short_channel_id: buy_response.intercept_scid, - fees: RoutingFees { base_msat: 0, proportional_millionths: 0 }, - cltv_expiry_delta: buy_response.cltv_expiry_delta as u16, - htlc_minimum_msat: None, - htlc_maximum_msat: None, - }]); - - let currency = self.config.network.into(); - let mut invoice_builder = InvoiceBuilder::new(currency) - .invoice_description(description.clone()) - .payment_hash(payment_hash) - .payment_secret(payment_secret) - .current_timestamp() - .min_final_cltv_expiry_delta(min_final_cltv_expiry_delta.into()) - .expiry_time(Duration::from_secs(expiry_secs.into())) - .private_route(route_hint); - - if let Some(amount_msat) = amount_msat { - invoice_builder = invoice_builder.amount_milli_satoshis(amount_msat).basic_mpp(); - } - - invoice_builder - .build_signed(|hash| { - Secp256k1::new() - .sign_ecdsa_recoverable(hash, &self.keys_manager.get_node_secret_key()) - }) - .map_err(|e| { - log_error!(self.logger, "Failed to build and sign invoice: {}", e); - Error::InvoiceCreationFailed - }) - } - - pub(crate) async fn handle_channel_ready( - &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey, - ) { - if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() { - if let Err(e) = lsps2_service_handler - .channel_ready(user_channel_id, channel_id, counterparty_node_id) - .await - { - log_error!( - self.logger, - "LSPS2 service failed to handle ChannelReady event: {:?}", - e - ); - } - } - } - - pub(crate) async fn handle_htlc_intercepted( - &self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64, - payment_hash: PaymentHash, - ) { - if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() { - if let Err(e) = lsps2_service_handler - .htlc_intercepted( - intercept_scid, - intercept_id, - expected_outbound_amount_msat, - payment_hash, - ) - .await - { - log_error!( - self.logger, - "LSPS2 service failed to handle HTLCIntercepted event: {:?}", - e - ); - } - } - } - - pub(crate) async fn handle_htlc_handling_failed(&self, failure_type: HTLCHandlingFailureType) { - if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() { - if let Err(e) = lsps2_service_handler.htlc_handling_failed(failure_type).await { - log_error!( - self.logger, - "LSPS2 service failed to handle HTLCHandlingFailed event: {:?}", - e - ); - } - } - } - - pub(crate) async fn handle_payment_forwarded( - &self, next_channel_id: Option, skimmed_fee_msat: u64, - ) { - if let Some(next_channel_id) = next_channel_id { - if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() { - if let Err(e) = - lsps2_service_handler.payment_forwarded(next_channel_id, skimmed_fee_msat).await - { - log_error!( - self.logger, - "LSPS2 service failed to handle PaymentForwarded: {:?}", - e - ); - } - } - } - } -} - -#[derive(Debug, Clone)] -pub(crate) struct LSPS1OpeningParamsResponse { - supported_options: LSPS1Options, -} - -/// Represents the status of an LSPS1 channel request. -#[derive(Debug, Clone)] -pub struct LSPS1OrderStatus { - /// The id of the channel order. - pub order_id: LSPS1OrderId, - /// The parameters of channel order. - pub order_params: LSPS1OrderParams, - /// Contains details about how to pay for the order. - pub payment_options: LSPS1PaymentInfo, - /// Contains information about the channel state. - pub channel_state: Option, -} - -#[cfg(not(feature = "uniffi"))] -type LSPS1PaymentInfo = lightning_liquidity::lsps1::msgs::LSPS1PaymentInfo; - -#[cfg(feature = "uniffi")] -type LSPS1PaymentInfo = crate::ffi::LSPS1PaymentInfo; - -#[derive(Debug, Clone)] -pub(crate) struct LSPS2FeeResponse { - opening_fee_params_menu: Vec, -} - -#[derive(Debug, Clone)] -pub(crate) struct LSPS2BuyResponse { - intercept_scid: u64, - cltv_expiry_delta: u32, -} - -/// A liquidity handler allowing to request channels via the [bLIP-51 / LSPS1] protocol. -/// -/// Should be retrieved by calling [`Node::lsps1_liquidity`]. -/// -/// To open [bLIP-52 / LSPS2] JIT channels, please refer to -/// [`Bolt11Payment::receive_via_jit_channel`]. -/// -/// [bLIP-51 / LSPS1]: https://github.com/lightning/blips/blob/master/blip-0051.md -/// [bLIP-52 / LSPS2]: https://github.com/lightning/blips/blob/master/blip-0052.md -/// [`Node::lsps1_liquidity`]: crate::Node::lsps1_liquidity -/// [`Bolt11Payment::receive_via_jit_channel`]: crate::payment::Bolt11Payment::receive_via_jit_channel -#[derive(Clone)] -#[cfg_attr(feature = "uniffi", derive(uniffi::Object))] -pub struct LSPS1Liquidity { - runtime: Arc, - wallet: Arc, - connection_manager: Arc>>, - liquidity_source: Option>>>, - logger: Arc, -} - -impl LSPS1Liquidity { - pub(crate) fn new( - runtime: Arc, wallet: Arc, - connection_manager: Arc>>, - liquidity_source: Option>>>, logger: Arc, - ) -> Self { - Self { runtime, wallet, connection_manager, liquidity_source, logger } - } -} - -#[cfg_attr(feature = "uniffi", uniffi::export)] -impl LSPS1Liquidity { - /// Connects to the configured LSP and places an order for an inbound channel. - /// - /// The channel will be opened after one of the returned payment options has successfully been - /// paid. - pub fn request_channel( - &self, lsp_balance_sat: u64, client_balance_sat: u64, channel_expiry_blocks: u32, - announce_channel: bool, - ) -> Result { - let liquidity_source = - self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; - - let (lsp_node_id, lsp_address) = - liquidity_source.get_lsps1_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; - - let con_node_id = lsp_node_id; - let con_addr = lsp_address.clone(); - let con_cm = Arc::clone(&self.connection_manager); - - // We need to use our main runtime here as a local runtime might not be around to poll - // connection futures going forward. - self.runtime.block_on(async move { - con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - })?; - - log_info!(self.logger, "Connected to LSP {}@{}. ", lsp_node_id, lsp_address); - - let refund_address = self.wallet.get_new_address()?; - - let liquidity_source = Arc::clone(&liquidity_source); - let response = self.runtime.block_on(async move { - liquidity_source - .lsps1_request_channel( - lsp_balance_sat, - client_balance_sat, - channel_expiry_blocks, - announce_channel, - refund_address, - ) - .await - })?; - - Ok(response) - } - - /// Connects to the configured LSP and checks for the status of a previously-placed order. - pub fn check_order_status(&self, order_id: LSPS1OrderId) -> Result { - let liquidity_source = - self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; - - let (lsp_node_id, lsp_address) = - liquidity_source.get_lsps1_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; - - let con_node_id = lsp_node_id; - let con_addr = lsp_address.clone(); - let con_cm = Arc::clone(&self.connection_manager); - - // We need to use our main runtime here as a local runtime might not be around to poll - // connection futures going forward. - self.runtime.block_on(async move { - con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - })?; - - let liquidity_source = Arc::clone(&liquidity_source); - let response = self - .runtime - .block_on(async move { liquidity_source.lsps1_check_order_status(order_id).await })?; - Ok(response) - } -} diff --git a/src/liquidity/client/lsps1.rs b/src/liquidity/client/lsps1.rs new file mode 100644 index 000000000..89d12701b --- /dev/null +++ b/src/liquidity/client/lsps1.rs @@ -0,0 +1,548 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; + +use bitcoin::secp256k1::PublicKey; +use lightning_liquidity::lsps0::ser::LSPSRequestId; +use lightning_liquidity::lsps1::event::LSPS1ClientEvent; +use lightning_liquidity::lsps1::msgs::{ + LSPS1ChannelInfo, LSPS1Options, LSPS1OrderId, LSPS1OrderParams, +}; +use tokio::sync::oneshot; + +use crate::connection::ConnectionManager; +use crate::liquidity::{ + is_lsps_node, select_lsps_for_protocol, LspNode, LIQUIDITY_REQUEST_TIMEOUT_SECS, +}; +use crate::logger::{log_error, log_info, LdkLogger, Logger}; +use crate::runtime::Runtime; +use crate::types::{LiquidityManager, Wallet}; +use crate::Error; + +pub(crate) struct LSPS1LiquiditySource +where + L::Target: LdkLogger, +{ + pub(crate) lsp_nodes: Arc>>, + pub(crate) pending_lsps1_opening_params_requests: + Mutex>>, + pub(crate) pending_lsps1_create_order_requests: + Mutex>>, + pub(crate) pending_lsps1_check_order_status_requests: + Mutex>>, + pub(crate) lsps1_order_lsp_map: Mutex>, + pub(crate) liquidity_manager: Arc, + pub(crate) logger: L, +} + +impl LSPS1LiquiditySource +where + L::Target: LdkLogger, +{ + pub(crate) async fn lsps1_request_opening_params( + &self, node_id: &PublicKey, + ) -> Result { + let lsps1_node = select_lsps_for_protocol(&self.lsp_nodes, 1, Some(node_id)) + .ok_or(Error::LiquiditySourceUnavailable)?; + + let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS1 liquidity client was not configured.",); + Error::LiquiditySourceUnavailable + })?; + + let (request_sender, request_receiver) = oneshot::channel(); + { + let mut pending_opening_params_requests_lock = + self.pending_lsps1_opening_params_requests.lock().unwrap(); + let request_id = client_handler.request_supported_options(lsps1_node.node_id); + pending_opening_params_requests_lock.insert(request_id, request_sender); + } + + tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), request_receiver) + .await + .map_err(|e| { + log_error!(self.logger, "Liquidity request timed out: {}", e); + Error::LiquidityRequestFailed + })? + .map_err(|e| { + log_error!(self.logger, "Failed to handle response from liquidity service: {}", e); + Error::LiquidityRequestFailed + }) + } + + pub(crate) async fn lsps1_request_channel( + &self, lsp_balance_sat: u64, client_balance_sat: u64, channel_expiry_blocks: u32, + announce_channel: bool, refund_address: bitcoin::Address, node_id: &PublicKey, + ) -> Result { + let lsps1_node = select_lsps_for_protocol(&self.lsp_nodes, 1, Some(node_id)) + .ok_or(Error::LiquiditySourceUnavailable)?; + + let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS1 liquidity client was not configured.",); + Error::LiquiditySourceUnavailable + })?; + + let lsp_limits = self.lsps1_request_opening_params(node_id).await?.supported_options; + let channel_size_sat = lsp_balance_sat + client_balance_sat; + + if channel_size_sat < lsp_limits.min_channel_balance_sat + || channel_size_sat > lsp_limits.max_channel_balance_sat + { + log_error!( + self.logger, + "Requested channel size of {}sat doesn't meet the LSP-provided limits (min: {}sat, max: {}sat).", + channel_size_sat, + lsp_limits.min_channel_balance_sat, + lsp_limits.max_channel_balance_sat + ); + return Err(Error::LiquidityRequestFailed); + } + + if lsp_balance_sat < lsp_limits.min_initial_lsp_balance_sat + || lsp_balance_sat > lsp_limits.max_initial_lsp_balance_sat + { + log_error!( + self.logger, + "Requested LSP-side balance of {}sat doesn't meet the LSP-provided limits (min: {}sat, max: {}sat).", + lsp_balance_sat, + lsp_limits.min_initial_lsp_balance_sat, + lsp_limits.max_initial_lsp_balance_sat + ); + return Err(Error::LiquidityRequestFailed); + } + + if client_balance_sat < lsp_limits.min_initial_client_balance_sat + || client_balance_sat > lsp_limits.max_initial_client_balance_sat + { + log_error!( + self.logger, + "Requested client-side balance of {}sat doesn't meet the LSP-provided limits (min: {}sat, max: {}sat).", + client_balance_sat, + lsp_limits.min_initial_client_balance_sat, + lsp_limits.max_initial_client_balance_sat + ); + return Err(Error::LiquidityRequestFailed); + } + + let order_params = LSPS1OrderParams { + lsp_balance_sat, + client_balance_sat, + required_channel_confirmations: lsp_limits.min_required_channel_confirmations, + funding_confirms_within_blocks: lsp_limits.min_funding_confirms_within_blocks, + channel_expiry_blocks, + token: lsps1_node.token.clone(), + announce_channel, + }; + + let (request_sender, request_receiver) = oneshot::channel(); + let request_id; + { + let mut pending_create_order_requests_lock = + self.pending_lsps1_create_order_requests.lock().unwrap(); + request_id = client_handler.create_order( + &lsps1_node.node_id, + order_params.clone(), + Some(refund_address), + ); + pending_create_order_requests_lock.insert(request_id.clone(), request_sender); + } + + let response = tokio::time::timeout( + Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), + request_receiver, + ) + .await + .map_err(|e| { + log_error!(self.logger, "Liquidity request with ID {:?} timed out: {}", request_id, e); + Error::LiquidityRequestFailed + })? + .map_err(|e| { + log_error!(self.logger, "Failed to handle response from liquidity service: {}", e); + Error::LiquidityRequestFailed + })?; + + if response.order_params != order_params { + log_error!( + self.logger, + "Aborting LSPS1 request as LSP-provided parameters don't match our order. Expected: {:?}, Received: {:?}", order_params, response.order_params + ); + return Err(Error::LiquidityRequestFailed); + } + + self.lsps1_order_lsp_map + .lock() + .unwrap() + .insert(response.order_id.clone(), lsps1_node.node_id); + + Ok(response) + } + + pub(crate) async fn lsps1_check_order_status( + &self, order_id: LSPS1OrderId, + ) -> Result { + let lsp_node_id = { + let lock = self.lsps1_order_lsp_map.lock().unwrap(); + *lock.get(&order_id).ok_or_else(|| { + log_error!(self.logger, "No LSP node ID found for LSPS1 order ID {:?}.", order_id); + Error::LiquiditySourceUnavailable + })? + }; + + let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS1 liquidity client was not configured.",); + Error::LiquiditySourceUnavailable + })?; + + let (request_sender, request_receiver) = oneshot::channel(); + { + let mut pending_check_order_status_requests_lock = + self.pending_lsps1_check_order_status_requests.lock().unwrap(); + let request_id = client_handler.check_order_status(&lsp_node_id, order_id); + pending_check_order_status_requests_lock.insert(request_id, request_sender); + } + + let response = tokio::time::timeout( + Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), + request_receiver, + ) + .await + .map_err(|e| { + log_error!(self.logger, "Liquidity request timed out: {}", e); + Error::LiquidityRequestFailed + })? + .map_err(|e| { + log_error!(self.logger, "Failed to handle response from liquidity service: {}", e); + Error::LiquidityRequestFailed + })?; + + Ok(response) + } + + pub(crate) async fn handle_next_event(&self, event: LSPS1ClientEvent) { + match event { + LSPS1ClientEvent::SupportedOptionsReady { + request_id, + counterparty_node_id, + supported_options, + } => { + if is_lsps_node(&self.lsp_nodes, &counterparty_node_id) { + if let Some(sender) = self + .pending_lsps1_opening_params_requests + .lock() + .unwrap() + .remove(&request_id) + { + let response = LSPS1OpeningParamsResponse { supported_options }; + + match sender.send(response) { + Ok(()) => (), + Err(_) => { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + }, + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS1Client::SupportedOptionsReady event!" + ); + } + }, + LSPS1ClientEvent::OrderCreated { + request_id, + counterparty_node_id, + order_id, + order, + payment, + channel, + } => { + if is_lsps_node(&self.lsp_nodes, &counterparty_node_id) { + if let Some(sender) = + self.pending_lsps1_create_order_requests.lock().unwrap().remove(&request_id) + { + let response = LSPS1OrderStatus { + order_id, + order_params: order, + payment_options: payment.into(), + channel_state: channel, + }; + + match sender.send(response) { + Ok(()) => (), + Err(_) => { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + }, + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + } + } else { + log_error!(self.logger, "Received unexpected LSPS1Client::OrderCreated event!"); + } + }, + LSPS1ClientEvent::OrderStatus { + request_id, + counterparty_node_id, + order_id, + order, + payment, + channel, + } => { + if is_lsps_node(&self.lsp_nodes, &counterparty_node_id) { + if let Some(sender) = self + .pending_lsps1_check_order_status_requests + .lock() + .unwrap() + .remove(&request_id) + { + let response = LSPS1OrderStatus { + order_id, + order_params: order, + payment_options: payment.into(), + channel_state: channel, + }; + + match sender.send(response) { + Ok(()) => (), + Err(_) => { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + }, + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + } + } else { + log_error!(self.logger, "Received unexpected LSPS1Client::OrderStatus event!"); + } + }, + _ => { + log_error!(self.logger, "Received unexpected LSPS1Client liquidity event!"); + }, + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct LSPS1OpeningParamsResponse { + supported_options: LSPS1Options, +} + +/// Represents the status of an LSPS1 channel request. +#[derive(Debug, Clone)] +pub struct LSPS1OrderStatus { + /// The id of the channel order. + pub order_id: LSPS1OrderId, + /// The parameters of channel order. + pub order_params: LSPS1OrderParams, + /// Contains details about how to pay for the order. + pub payment_options: LSPS1PaymentInfo, + /// Contains information about the channel state. + pub channel_state: Option, +} + +#[cfg(not(feature = "uniffi"))] +type LSPS1PaymentInfo = lightning_liquidity::lsps1::msgs::LSPS1PaymentInfo; + +#[cfg(feature = "uniffi")] +type LSPS1PaymentInfo = crate::ffi::LSPS1PaymentInfo; + +/// A liquidity handler allowing to request channels via the [bLIP-51 / LSPS1] protocol. +/// +/// Should be retrieved by calling [`Node::lsps1_liquidity`]. +/// +/// To open [bLIP-52 / LSPS2] JIT channels, please refer to +/// [`Bolt11Payment::receive_via_jit_channel`]. +/// +/// [bLIP-51 / LSPS1]: https://github.com/lightning/blips/blob/master/blip-0051.md +/// [bLIP-52 / LSPS2]: https://github.com/lightning/blips/blob/master/blip-0052.md +/// [`Node::lsps1_liquidity`]: crate::Node::lsps1_liquidity +/// [`Bolt11Payment::receive_via_jit_channel`]: crate::payment::Bolt11Payment::receive_via_jit_channel +#[derive(Clone)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Object))] +pub struct LSPS1Liquidity { + runtime: Arc, + wallet: Arc, + connection_manager: Arc>>, + liquidity_source: Option>>>, + logger: Arc, +} + +impl LSPS1Liquidity { + pub(crate) fn new( + runtime: Arc, wallet: Arc, + connection_manager: Arc>>, + liquidity_source: Option>>>, logger: Arc, + ) -> Self { + Self { runtime, wallet, connection_manager, liquidity_source, logger } + } +} + +#[cfg_attr(feature = "uniffi", uniffi::export)] +impl LSPS1Liquidity { + /// Connects to the configured LSP and places an order for an inbound channel. + /// + /// The channel will be opened after one of the returned payment options has successfully been + /// paid. + pub fn request_channel( + &self, lsp_balance_sat: u64, client_balance_sat: u64, channel_expiry_blocks: u32, + announce_channel: bool, + ) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let lsps1_node = select_lsps_for_protocol(&liquidity_source.lsp_nodes, 1, None) + .ok_or(Error::LiquiditySourceUnavailable)?; + + let con_node_id = lsps1_node.node_id; + let con_addr = lsps1_node.address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", lsps1_node.node_id, lsps1_node.address); + + let refund_address = self.wallet.get_new_address()?; + + let liquidity_source = Arc::clone(&liquidity_source); + let response = self.runtime.block_on(async move { + liquidity_source + .lsps1_request_channel( + lsp_balance_sat, + client_balance_sat, + channel_expiry_blocks, + announce_channel, + refund_address, + &con_node_id, + ) + .await + })?; + + Ok(response) + } + + /// Connects to the specified configured LSP and places an order for an inbound channel. + /// + /// The channel will be opened after one of the returned payment options has successfully been + /// paid. + pub fn request_channel_from_lsp( + &self, lsp_balance_sat: u64, client_balance_sat: u64, channel_expiry_blocks: u32, + announce_channel: bool, node_id: PublicKey, + ) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let lsps1_node = select_lsps_for_protocol(&liquidity_source.lsp_nodes, 1, Some(&node_id)) + .ok_or(Error::LiquiditySourceUnavailable)?; + + let con_node_id = lsps1_node.node_id; + let con_addr = lsps1_node.address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + log_info!(self.logger, "Connected to LSP {}@{}. ", lsps1_node.node_id, lsps1_node.address); + + let refund_address = self.wallet.get_new_address()?; + + let liquidity_source = Arc::clone(&liquidity_source); + let response = self.runtime.block_on(async move { + liquidity_source + .lsps1_request_channel( + lsp_balance_sat, + client_balance_sat, + channel_expiry_blocks, + announce_channel, + refund_address, + &con_node_id, + ) + .await + })?; + + Ok(response) + } + + /// Connects to the configured LSP and checks for the status of a previously-placed order. + pub fn check_order_status(&self, order_id: LSPS1OrderId) -> Result { + let liquidity_source = + self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let lsp_node_id = { + let lock = liquidity_source.lsps1_order_lsp_map.lock().unwrap(); + *lock.get(&order_id).ok_or_else(|| { + log_error!(self.logger, "No LSP node ID found for LSPS1 order ID {:?}.", order_id); + Error::LiquiditySourceUnavailable + })? + }; + + let lsps1_node = + select_lsps_for_protocol(&liquidity_source.lsp_nodes, 1, Some(&lsp_node_id)) + .ok_or(Error::LiquiditySourceUnavailable)?; + + let con_node_id = lsps1_node.node_id; + let con_addr = lsps1_node.address.clone(); + let con_cm = Arc::clone(&self.connection_manager); + + // We need to use our main runtime here as a local runtime might not be around to poll + // connection futures going forward. + self.runtime.block_on(async move { + con_cm.connect_peer_if_necessary(con_node_id, con_addr).await + })?; + + let liquidity_source = Arc::clone(&liquidity_source); + let response = self + .runtime + .block_on(async move { liquidity_source.lsps1_check_order_status(order_id).await })?; + Ok(response) + } +} diff --git a/src/liquidity/client/lsps2.rs b/src/liquidity/client/lsps2.rs new file mode 100644 index 000000000..795020b15 --- /dev/null +++ b/src/liquidity/client/lsps2.rs @@ -0,0 +1,505 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; + +use bitcoin::secp256k1::{PublicKey, Secp256k1}; +use lightning::ln::channelmanager::MIN_FINAL_CLTV_EXPIRY_DELTA; +use lightning::log_warn; +use lightning::routing::router::{RouteHint, RouteHintHop}; +use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, InvoiceBuilder, RoutingFees}; +use lightning_liquidity::lsps0::ser::LSPSRequestId; +use lightning_liquidity::lsps2::event::LSPS2ClientEvent; +use lightning_liquidity::lsps2::msgs::LSPS2OpeningFeeParams; +use lightning_liquidity::lsps2::utils::compute_opening_fee; +use lightning_types::payment::PaymentHash; +use tokio::sync::oneshot; + +use crate::connection::ConnectionManager; +use crate::liquidity::{ + is_lsps_node, select_all_lsps_for_protocol, select_lsps_for_protocol, LspConfig, LspNode, + LIQUIDITY_REQUEST_TIMEOUT_SECS, +}; +use crate::logger::{log_debug, log_error, log_info, LdkLogger}; +use crate::types::{ChannelManager, KeysManager, LiquidityManager}; +use crate::{Config, Error}; + +pub(crate) struct LSPS2ClientLiquiditySource +where + L::Target: LdkLogger, +{ + pub(crate) lsp_nodes: Arc>>, + pub(crate) pending_lsps2_fee_requests: + Mutex>>, + pub(crate) pending_lsps2_buy_requests: + Mutex>>, + pub(crate) channel_manager: Arc, + pub(crate) keys_manager: Arc, + pub(crate) liquidity_manager: Arc, + pub(crate) config: Arc, + pub(crate) logger: L, +} + +impl LSPS2ClientLiquiditySource +where + L::Target: LdkLogger, +{ + pub(crate) async fn lsps2_receive_to_jit_channel( + &self, amount_msat: u64, description: &Bolt11InvoiceDescription, expiry_secs: u32, + max_total_lsp_fee_limit_msat: Option, payment_hash: Option, + connection_manager: &ConnectionManager, + ) -> Result<(Bolt11Invoice, u64, LspConfig), Error> { + let lsps2_nodes = select_all_lsps_for_protocol(&self.lsp_nodes, 2); + if lsps2_nodes.is_empty() { + log_error!(self.logger, "No LSPs available for LSPS2 protocol.",); + return Err(Error::LiquiditySourceUnavailable); + } + + // Connect to all candidate LSPs before querying fees. + for lsp_node in &lsps2_nodes { + if let Err(e) = connection_manager + .connect_peer_if_necessary(lsp_node.node_id, lsp_node.address.clone()) + .await + { + log_warn!( + self.logger, + "Failed to connect to LSP {} for fee query: {}", + lsp_node.node_id, + e + ); + } + } + + let mut all_offers = Vec::new(); + for lsp_node in &lsps2_nodes { + match self.lsps2_request_opening_fee_params(Some(&lsp_node.node_id)).await { + Ok(fee_response) => all_offers.push((lsp_node.clone(), fee_response)), + Err(e) => { + log_warn!( + self.logger, + "Failed to get fees from LSP {}: {}", + lsp_node.node_id, + e + ); + continue; + }, + } + } + + let (cheapest_lsp, min_total_fee_msat, min_opening_params) = all_offers + .into_iter() + .flat_map(|(lsp, resp)| { + resp.opening_fee_params_menu + .into_iter() + .map(move |params| (lsp.clone(), params)) + }) + .filter_map(|(lsp, params)| { + if amount_msat < params.min_payment_size_msat + || amount_msat > params.max_payment_size_msat + { + log_debug!(self.logger, + "Skipping LSP {}'s JIT offer as the payment of {}msat doesn't meet LSP limits (min: {}msat, max: {}msat)", + lsp.node_id, + amount_msat, + params.min_payment_size_msat, + params.max_payment_size_msat + ); + None + } else { + compute_opening_fee(amount_msat, params.min_fee_msat, params.proportional as u64) + .map(|fee| (lsp, fee, params)) + } + }) + .min_by_key(|(_, fee, _)| *fee) + .ok_or_else(|| { + log_error!(self.logger, "Failed to handle response from liquidity service",); + Error::LiquidityRequestFailed + })?; + + if let Some(max_total_lsp_fee_limit_msat) = max_total_lsp_fee_limit_msat { + if min_total_fee_msat > max_total_lsp_fee_limit_msat { + log_error!(self.logger, + "Failed to request inbound JIT channel as LSP's requested total opening fee of {}msat exceeds our fee limit of {}msat", + min_total_fee_msat, max_total_lsp_fee_limit_msat + ); + return Err(Error::LiquidityFeeTooHigh); + } + } + + log_debug!( + self.logger, + "Choosing cheapest liquidity offer from LSP {}, will pay {}msat in total LSP fees", + cheapest_lsp.node_id, + min_total_fee_msat + ); + + let buy_response = self + .lsps2_send_buy_request( + Some(amount_msat), + min_opening_params, + Some(&cheapest_lsp.node_id), + ) + .await?; + let invoice = self.lsps2_create_jit_invoice( + buy_response, + Some(amount_msat), + description, + expiry_secs, + payment_hash, + Some(&cheapest_lsp.node_id), + )?; + + log_info!(self.logger, "JIT-channel invoice created: {}", invoice); + Ok((invoice, min_total_fee_msat, cheapest_lsp)) + } + + pub(crate) async fn lsps2_receive_variable_amount_to_jit_channel( + &self, description: &Bolt11InvoiceDescription, expiry_secs: u32, + max_proportional_lsp_fee_limit_ppm_msat: Option, payment_hash: Option, + connection_manager: &ConnectionManager, + ) -> Result<(Bolt11Invoice, u64, LspConfig), Error> { + let lsps2_nodes = select_all_lsps_for_protocol(&self.lsp_nodes, 2); + if lsps2_nodes.is_empty() { + log_error!(self.logger, "No LSPs available for LSPS2 protocol.",); + return Err(Error::LiquiditySourceUnavailable); + } + + // Connect to all candidate LSPs before querying fees. + for lsp_node in &lsps2_nodes { + if let Err(e) = connection_manager + .connect_peer_if_necessary(lsp_node.node_id, lsp_node.address.clone()) + .await + { + log_warn!( + self.logger, + "Failed to connect to LSP {} for fee query: {}", + lsp_node.node_id, + e + ); + } + } + + let mut all_offers = Vec::new(); + for lsp_node in &lsps2_nodes { + match self.lsps2_request_opening_fee_params(Some(&lsp_node.node_id)).await { + Ok(fee_response) => all_offers.push((lsp_node.clone(), fee_response)), + Err(e) => { + log_warn!( + self.logger, + "Failed to get fees from LSP {}: {}", + lsp_node.node_id, + e + ); + continue; + }, + } + } + + let (cheapest_lsp, min_prop_fee_ppm_msat, min_opening_params) = all_offers + .into_iter() + .flat_map(|(lsp, resp)| { + resp.opening_fee_params_menu.into_iter().map(move |params| (lsp.clone(), params)) + }) + .map(|(lsp, params)| { + let ppm = params.proportional as u64; + (lsp, ppm, params) + }) + .min_by_key(|(_, ppm, _)| *ppm) + .ok_or_else(|| { + log_error!(self.logger, "Failed to handle response from liquidity service",); + Error::LiquidityRequestFailed + })?; + + if let Some(max_proportional_lsp_fee_limit_ppm_msat) = + max_proportional_lsp_fee_limit_ppm_msat + { + if min_prop_fee_ppm_msat > max_proportional_lsp_fee_limit_ppm_msat { + log_error!(self.logger, + "Failed to request inbound JIT channel as LSP's requested proportional opening fee of {} ppm msat exceeds our fee limit of {} ppm msat", + min_prop_fee_ppm_msat, + max_proportional_lsp_fee_limit_ppm_msat + ); + return Err(Error::LiquidityFeeTooHigh); + } + } + + log_debug!( + self.logger, + "Choosing cheapest liquidity offer from LSP {}, will pay {}ppm msat in proportional LSP fees", + cheapest_lsp.node_id, + min_prop_fee_ppm_msat + ); + + let buy_response = self + .lsps2_send_buy_request(None, min_opening_params, Some(&cheapest_lsp.node_id)) + .await?; + let invoice = self.lsps2_create_jit_invoice( + buy_response, + None, + description, + expiry_secs, + payment_hash, + Some(&cheapest_lsp.node_id), + )?; + + log_info!(self.logger, "JIT-channel invoice created: {}", invoice); + Ok((invoice, min_prop_fee_ppm_msat, cheapest_lsp)) + } +} + +impl LSPS2ClientLiquiditySource +where + L::Target: LdkLogger, +{ + async fn lsps2_request_opening_fee_params( + &self, node_id: Option<&PublicKey>, + ) -> Result { + let lsps2_node = select_lsps_for_protocol(&self.lsp_nodes, 2, node_id) + .ok_or(Error::LiquiditySourceUnavailable)?; + + let client_handler = self.liquidity_manager.lsps2_client_handler().ok_or_else(|| { + log_error!(self.logger, "Liquidity client was not configured.",); + Error::LiquiditySourceUnavailable + })?; + + let (fee_request_sender, fee_request_receiver) = oneshot::channel(); + { + let mut pending_fee_requests_lock = self.pending_lsps2_fee_requests.lock().unwrap(); + let request_id = + client_handler.request_opening_params(lsps2_node.node_id, lsps2_node.token.clone()); + pending_fee_requests_lock.insert(request_id, fee_request_sender); + } + + tokio::time::timeout( + Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), + fee_request_receiver, + ) + .await + .map_err(|e| { + log_error!(self.logger, "Liquidity request timed out: {}", e); + Error::LiquidityRequestFailed + })? + .map_err(|e| { + log_error!(self.logger, "Failed to handle response from liquidity service: {}", e); + Error::LiquidityRequestFailed + }) + } + + async fn lsps2_send_buy_request( + &self, amount_msat: Option, opening_fee_params: LSPS2OpeningFeeParams, + node_id: Option<&PublicKey>, + ) -> Result { + let lsps2_node = select_lsps_for_protocol(&self.lsp_nodes, 2, node_id) + .ok_or(Error::LiquiditySourceUnavailable)?; + + let client_handler = self.liquidity_manager.lsps2_client_handler().ok_or_else(|| { + log_error!(self.logger, "Liquidity client was not configured.",); + Error::LiquiditySourceUnavailable + })?; + + let (buy_request_sender, buy_request_receiver) = oneshot::channel(); + { + let mut pending_buy_requests_lock = self.pending_lsps2_buy_requests.lock().unwrap(); + let request_id = client_handler + .select_opening_params(lsps2_node.node_id, amount_msat, opening_fee_params) + .map_err(|e| { + log_error!( + self.logger, + "Failed to send buy request to liquidity service: {:?}", + e + ); + Error::LiquidityRequestFailed + })?; + pending_buy_requests_lock.insert(request_id, buy_request_sender); + } + + let buy_response = tokio::time::timeout( + Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), + buy_request_receiver, + ) + .await + .map_err(|e| { + log_error!(self.logger, "Liquidity request timed out: {}", e); + Error::LiquidityRequestFailed + })? + .map_err(|e| { + log_error!(self.logger, "Failed to handle response from liquidity service: {:?}", e); + Error::LiquidityRequestFailed + })?; + + Ok(buy_response) + } + + fn lsps2_create_jit_invoice( + &self, buy_response: LSPS2BuyResponse, amount_msat: Option, + description: &Bolt11InvoiceDescription, expiry_secs: u32, + payment_hash: Option, node_id: Option<&PublicKey>, + ) -> Result { + let lsps2_node = select_lsps_for_protocol(&self.lsp_nodes, 2, node_id) + .ok_or(Error::LiquiditySourceUnavailable)?; + + // LSPS2 requires min_final_cltv_expiry_delta to be at least 2 more than usual. + let min_final_cltv_expiry_delta = MIN_FINAL_CLTV_EXPIRY_DELTA + 2; + let (payment_hash, payment_secret) = match payment_hash { + Some(payment_hash) => { + let payment_secret = self + .channel_manager + .create_inbound_payment_for_hash( + payment_hash, + None, + expiry_secs, + Some(min_final_cltv_expiry_delta), + ) + .map_err(|e| { + log_error!(self.logger, "Failed to register inbound payment: {:?}", e); + Error::InvoiceCreationFailed + })?; + (payment_hash, payment_secret) + }, + None => self + .channel_manager + .create_inbound_payment(None, expiry_secs, Some(min_final_cltv_expiry_delta)) + .map_err(|e| { + log_error!(self.logger, "Failed to register inbound payment: {:?}", e); + Error::InvoiceCreationFailed + })?, + }; + + let route_hint = RouteHint(vec![RouteHintHop { + src_node_id: lsps2_node.node_id, + short_channel_id: buy_response.intercept_scid, + fees: RoutingFees { base_msat: 0, proportional_millionths: 0 }, + cltv_expiry_delta: buy_response.cltv_expiry_delta as u16, + htlc_minimum_msat: None, + htlc_maximum_msat: None, + }]); + + let currency = self.config.network.into(); + let mut invoice_builder = InvoiceBuilder::new(currency) + .invoice_description(description.clone()) + .payment_hash(payment_hash) + .payment_secret(payment_secret) + .current_timestamp() + .min_final_cltv_expiry_delta(min_final_cltv_expiry_delta.into()) + .expiry_time(Duration::from_secs(expiry_secs.into())) + .private_route(route_hint); + + if let Some(amount_msat) = amount_msat { + invoice_builder = invoice_builder.amount_milli_satoshis(amount_msat).basic_mpp(); + } + + invoice_builder + .build_signed(|hash| { + Secp256k1::new() + .sign_ecdsa_recoverable(hash, &self.keys_manager.get_node_secret_key()) + }) + .map_err(|e| { + log_error!(self.logger, "Failed to build and sign invoice: {}", e); + Error::InvoiceCreationFailed + }) + } + + pub(crate) async fn handle_next_event(&self, event: LSPS2ClientEvent) { + match event { + LSPS2ClientEvent::OpeningParametersReady { + request_id, + counterparty_node_id, + opening_fee_params_menu, + } => { + if is_lsps_node(&self.lsp_nodes, &counterparty_node_id) { + if let Some(sender) = + self.pending_lsps2_fee_requests.lock().unwrap().remove(&request_id) + { + let response = LSPS2FeeResponse { opening_fee_params_menu }; + + match sender.send(response) { + Ok(()) => (), + Err(_) => { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + }, + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS2Client::OpeningParametersReady event!" + ); + } + }, + LSPS2ClientEvent::InvoiceParametersReady { + request_id, + counterparty_node_id, + intercept_scid, + cltv_expiry_delta, + .. + } => { + if is_lsps_node(&self.lsp_nodes, &counterparty_node_id) { + if let Some(sender) = + self.pending_lsps2_buy_requests.lock().unwrap().remove(&request_id) + { + let response = LSPS2BuyResponse { intercept_scid, cltv_expiry_delta }; + + match sender.send(response) { + Ok(()) => (), + Err(_) => { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + request_id + ); + }, + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS2Client::InvoiceParametersReady event!" + ); + } + }, + _ => { + log_error!(self.logger, "Received unexpected LSPS2Client liquidity event!"); + }, + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct LSPS2FeeResponse { + opening_fee_params_menu: Vec, +} + +#[derive(Debug, Clone)] +pub(crate) struct LSPS2BuyResponse { + intercept_scid: u64, + cltv_expiry_delta: u32, +} diff --git a/src/liquidity/client/mod.rs b/src/liquidity/client/mod.rs new file mode 100644 index 000000000..15ca7e965 --- /dev/null +++ b/src/liquidity/client/mod.rs @@ -0,0 +1,11 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +pub(crate) mod lsps1; +pub(crate) mod lsps2; + +pub use lsps1::LSPS1OrderStatus; diff --git a/src/liquidity/mod.rs b/src/liquidity/mod.rs new file mode 100644 index 000000000..d046c4ccd --- /dev/null +++ b/src/liquidity/mod.rs @@ -0,0 +1,424 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! Objects related to liquidity management. + +pub(crate) mod client; +pub(crate) mod service; + +pub(crate) use client::lsps1::LSPS1Liquidity; +pub use client::LSPS1OrderStatus; +pub use service::lsps2::LSPS2ServiceConfig; + +use std::collections::HashMap; +use std::ops::Deref; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; + +use bitcoin::secp256k1::PublicKey; +use lightning::ln::msgs::SocketAddress; +use lightning_liquidity::events::LiquidityEvent; +use lightning_liquidity::lsps0::event::LSPS0ClientEvent; +use lightning_liquidity::lsps1::client::LSPS1ClientConfig as LdkLSPS1ClientConfig; +use lightning_liquidity::lsps2::client::LSPS2ClientConfig as LdkLSPS2ClientConfig; +use lightning_liquidity::lsps2::service::LSPS2ServiceConfig as LdkLSPS2ServiceConfig; +use lightning_liquidity::{LiquidityClientConfig, LiquidityServiceConfig}; +use tokio::sync::oneshot; + +use crate::builder::BuildError; +use crate::liquidity::client::lsps1::LSPS1LiquiditySource; +use crate::liquidity::client::lsps2::LSPS2ClientLiquiditySource; +use crate::liquidity::service::lsps2::{LSPS2Service, LSPS2ServiceLiquiditySource}; +use crate::logger::{log_error, log_info, LdkLogger}; +use crate::types::{Broadcaster, ChannelManager, DynStore, KeysManager, LiquidityManager, Wallet}; +use crate::{Config, Error}; + +const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; + +fn select_lsps_for_protocol( + lsp_nodes: &Arc>>, protocol: u16, override_node_id: Option<&PublicKey>, +) -> Option { + lsp_nodes + .read() + .unwrap() + .iter() + .find(|lsp_node| { + if let Some(override_node_id) = override_node_id { + lsp_node.node_id == *override_node_id + && lsp_node + .supported_protocols + .lock() + .unwrap() + .as_ref() + .map(|protocols| protocols.contains(&protocol)) + .unwrap_or(false) + } else { + lsp_node + .supported_protocols + .lock() + .unwrap() + .as_ref() + .map(|protocols| protocols.contains(&protocol)) + .unwrap_or(false) + } + }) + .map(|n| LspConfig { + node_id: n.node_id, + address: n.address.clone(), + token: n.token.clone(), + }) +} + +fn select_all_lsps_for_protocol( + lsp_nodes: &Arc>>, protocol: u16, +) -> Vec { + lsp_nodes + .read() + .unwrap() + .iter() + .filter(|lsp_node| { + lsp_node + .supported_protocols + .lock() + .unwrap() + .as_ref() + .map(|protocols| protocols.contains(&protocol)) + .unwrap_or(false) + }) + .map(|n| LspConfig { + node_id: n.node_id, + address: n.address.clone(), + token: n.token.clone(), + }) + .collect() +} + +fn is_lsps_node(lsp_nodes: &Arc>>, node_id: &PublicKey) -> bool { + lsp_nodes.read().unwrap().iter().any(|n| n.node_id == *node_id) +} + +#[derive(Debug, Clone)] +pub(crate) struct LspConfig { + pub node_id: PublicKey, + pub address: SocketAddress, + pub token: Option, +} + +pub(crate) struct LspNode { + node_id: PublicKey, + address: SocketAddress, + token: Option, + // Protocol numbers discovered via LSPS0 (e.g., 1 = LSPS1, 2 = LSPS2, 5 = LSPS5). + supported_protocols: Mutex>>, +} + +pub(crate) struct LiquiditySourceBuilder +where + L::Target: LdkLogger, +{ + lsp_nodes: Vec, + lsps2_service: Option, + wallet: Arc, + channel_manager: Arc, + keys_manager: Arc, + tx_broadcaster: Arc, + kv_store: Arc, + config: Arc, + logger: L, +} + +impl LiquiditySourceBuilder +where + L::Target: LdkLogger, +{ + pub(crate) fn new( + wallet: Arc, channel_manager: Arc, keys_manager: Arc, + tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: L, + ) -> Self { + let lsp_nodes = Vec::new(); + let lsps2_service = None; + Self { + lsp_nodes, + lsps2_service, + wallet, + channel_manager, + keys_manager, + tx_broadcaster, + kv_store, + config, + logger, + } + } + + pub(crate) fn set_lsp_nodes(&mut self, lsp_nodes: Vec) -> &mut Self { + self.lsp_nodes = lsp_nodes; + self + } + + pub(crate) fn lsps2_service( + &mut self, promise_secret: [u8; 32], service_config: LSPS2ServiceConfig, + ) -> &mut Self { + let ldk_service_config = LdkLSPS2ServiceConfig { promise_secret }; + self.lsps2_service = Some(LSPS2Service { service_config, ldk_service_config }); + self + } + + pub(crate) async fn build(self) -> Result, BuildError> { + let liquidity_service_config = self.lsps2_service.as_ref().map(|s| { + let lsps2_service_config = Some(s.ldk_service_config.clone()); + let lsps5_service_config = None; + let advertise_service = s.service_config.advertise_service; + LiquidityServiceConfig { + lsps1_service_config: None, + lsps2_service_config, + lsps5_service_config, + advertise_service, + } + }); + + // Adding LSPS at runtime is now supported, so we create the client + // config regardless of whether LSPs exist at build time + let liquidity_client_config = Some(LiquidityClientConfig { + lsps1_client_config: Some(LdkLSPS1ClientConfig { max_channel_fees_msat: None }), + lsps2_client_config: Some(LdkLSPS2ClientConfig {}), + lsps5_client_config: None, + }); + + let liquidity_manager = Arc::new( + LiquidityManager::new( + Arc::clone(&self.keys_manager), + Arc::clone(&self.keys_manager), + Arc::clone(&self.channel_manager), + Arc::clone(&self.kv_store), + Arc::clone(&self.tx_broadcaster), + liquidity_service_config, + liquidity_client_config, + ) + .await + .map_err(|_| BuildError::ReadFailed)?, + ); + + let lsp_nodes = Arc::new(RwLock::new( + self.lsp_nodes + .into_iter() + .map(|cfg| LspNode { + node_id: cfg.node_id, + address: cfg.address, + token: cfg.token, + supported_protocols: Mutex::new(None), + }) + .collect(), + )); + + Ok(LiquiditySource { + lsp_nodes: Arc::clone(&lsp_nodes), + lsps1_client: Arc::new(LSPS1LiquiditySource { + lsp_nodes: Arc::clone(&lsp_nodes), + pending_lsps1_opening_params_requests: Mutex::new(HashMap::new()), + pending_lsps1_create_order_requests: Mutex::new(HashMap::new()), + pending_lsps1_check_order_status_requests: Mutex::new(HashMap::new()), + lsps1_order_lsp_map: Mutex::new(HashMap::new()), + liquidity_manager: Arc::clone(&liquidity_manager), + logger: self.logger.clone(), + }), + lsps2_client: Arc::new(LSPS2ClientLiquiditySource { + lsp_nodes: Arc::clone(&lsp_nodes), + pending_lsps2_fee_requests: Mutex::new(HashMap::new()), + pending_lsps2_buy_requests: Mutex::new(HashMap::new()), + channel_manager: self.channel_manager.clone(), + keys_manager: self.keys_manager.clone(), + liquidity_manager: Arc::clone(&liquidity_manager), + config: self.config.clone(), + logger: self.logger.clone(), + }), + lsps2_service: Arc::new(LSPS2ServiceLiquiditySource { + lsps2_service: self.lsps2_service, + wallet: self.wallet, + channel_manager: self.channel_manager, + peer_manager: RwLock::new(None), + keys_manager: self.keys_manager, + liquidity_manager: Arc::clone(&liquidity_manager), + config: self.config.clone(), + logger: self.logger.clone(), + }), + pending_lsps0_discovery: Mutex::new(HashMap::new()), + liquidity_manager, + logger: self.logger, + }) + } +} + +pub(crate) struct LiquiditySource +where + L::Target: LdkLogger, +{ + lsp_nodes: Arc>>, + lsps1_client: Arc>, + lsps2_client: Arc>, + lsps2_service: Arc>, + pending_lsps0_discovery: Mutex>>>, + liquidity_manager: Arc, + logger: L, +} + +impl LiquiditySource +where + L::Target: LdkLogger, +{ + pub(crate) fn liquidity_manager(&self) -> Arc { + Arc::clone(&self.liquidity_manager) + } + + pub(crate) fn lsps1_client(&self) -> Arc> { + Arc::clone(&self.lsps1_client) + } + + pub(crate) fn lsps2_client(&self) -> Arc> { + Arc::clone(&self.lsps2_client) + } + + pub(crate) fn lsps2_service(&self) -> Arc> { + Arc::clone(&self.lsps2_service) + } + + pub(crate) async fn handle_next_event(&self) { + match self.liquidity_manager.next_event_async().await { + LiquidityEvent::LSPS1Client(event) => self.lsps1_client.handle_next_event(event).await, + LiquidityEvent::LSPS2Client(event) => self.lsps2_client.handle_next_event(event).await, + LiquidityEvent::LSPS2Service(event) => { + self.lsps2_service.handle_next_event(event).await + }, + + LiquidityEvent::LSPS0Client(LSPS0ClientEvent::ListProtocolsResponse { + counterparty_node_id, + protocols, + }) => { + if self.is_lsps_node(&counterparty_node_id) { + if let Some(sender) = + self.pending_lsps0_discovery.lock().unwrap().remove(&counterparty_node_id) + { + match sender.send(protocols) { + Ok(()) => (), + Err(_) => { + log_error!( + self.logger, + "Failed to handle response for request {:?} from liquidity service", + counterparty_node_id + ); + }, + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + } + } else { + log_error!( + self.logger, + "Received LSPS0 ListProtocolsResponse from unexpected counterparty {}.", + counterparty_node_id + ); + } + }, + e => { + log_error!(self.logger, "Received unexpected liquidity event: {:?}", e); + }, + } + } + + pub(crate) fn is_lsps_node(&self, node_id: &PublicKey) -> bool { + is_lsps_node(&self.lsp_nodes, node_id) + } + + pub(crate) fn get_all_lsp_details(&self) -> Vec<(PublicKey, SocketAddress)> { + self.lsp_nodes.read().unwrap().iter().map(|n| (n.node_id, n.address.clone())).collect() + } + + pub(crate) fn add_lsp_node(&self, lsp_node: LspConfig) -> Result<(), Error> { + self.lsp_nodes.write().unwrap().push(LspNode { + node_id: lsp_node.node_id, + address: lsp_node.address.clone(), + token: lsp_node.token.clone(), + supported_protocols: Mutex::new(None), + }); + + Ok(()) + } + + pub(crate) async fn discover_lsp_protocols( + &self, node_id: &PublicKey, + ) -> Result, Error> { + let lsps0_handler = self.liquidity_manager.lsps0_client_handler(); + + let (sender, receiver) = oneshot::channel(); + { + let mut pending_discovery = self.pending_lsps0_discovery.lock().unwrap(); + lsps0_handler.list_protocols(node_id); + pending_discovery.insert(*node_id, sender); + } + + let protocols = + tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), receiver) + .await + .map_err(|e| { + log_error!( + self.logger, + "LSPS0 discovery request timed out for {}: {}", + node_id, + e + ); + Error::LiquidityRequestFailed + })? + .map_err(|e| { + log_error!( + self.logger, + "Failed to handle LSPS0 discovery response from {}: {}", + node_id, + e + ); + Error::LiquidityRequestFailed + })?; + + if let Some(lsp_node) = + self.lsp_nodes.read().unwrap().iter().find(|n| &n.node_id == node_id) + { + *lsp_node.supported_protocols.lock().unwrap() = Some(protocols.clone()); + } + + Ok(protocols) + } + + pub(crate) async fn discover_all_lsp_protocols(&self) { + let node_ids: Vec = + self.lsp_nodes.read().unwrap().iter().map(|n| n.node_id).collect(); + for node_id in &node_ids { + match self.discover_lsp_protocols(node_id).await { + Ok(protocols) => { + log_info!( + self.logger, + "Discovered protocols for LSP {}: {:?}", + node_id, + protocols + ); + }, + Err(e) => { + log_error!( + self.logger, + "Failed to discover protocols for LSP {}: {:?}", + node_id, + e + ); + }, + } + } + } +} diff --git a/src/liquidity/service/lsps2.rs b/src/liquidity/service/lsps2.rs new file mode 100644 index 000000000..1c3e70708 --- /dev/null +++ b/src/liquidity/service/lsps2.rs @@ -0,0 +1,506 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::ops::Deref; +use std::sync::{Arc, RwLock, Weak}; +use std::time::Duration; + +use bitcoin::secp256k1::PublicKey; +use bitcoin::Transaction; +use chrono::Utc; +use lightning::events::HTLCHandlingFailureType; +use lightning::ln::channelmanager::InterceptId; +use lightning::ln::types::ChannelId; +use lightning::sign::EntropySource; +use lightning_liquidity::lsps0::ser::LSPSDateTime; +use lightning_liquidity::lsps2::event::LSPS2ServiceEvent; +use lightning_liquidity::lsps2::msgs::LSPS2RawOpeningFeeParams; +use lightning_liquidity::lsps2::service::LSPS2ServiceConfig as LdkLSPS2ServiceConfig; +use lightning_types::payment::PaymentHash; + +use crate::logger::{log_error, LdkLogger}; +use crate::types::{ChannelManager, KeysManager, LiquidityManager, PeerManager, Wallet}; +use crate::{total_anchor_channels_reserve_sats, Config}; + +const LSPS2_GETINFO_REQUEST_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24); +const LSPS2_CHANNEL_CLTV_EXPIRY_DELTA: u32 = 72; + +pub(crate) struct LSPS2Service { + pub(crate) service_config: LSPS2ServiceConfig, + pub(crate) ldk_service_config: LdkLSPS2ServiceConfig, +} + +pub(crate) struct LSPS2ServiceLiquiditySource +where + L::Target: LdkLogger, +{ + pub(crate) lsps2_service: Option, + pub(crate) wallet: Arc, + pub(crate) channel_manager: Arc, + pub(crate) peer_manager: RwLock>>, + pub(crate) keys_manager: Arc, + pub(crate) liquidity_manager: Arc, + pub(crate) config: Arc, + pub(crate) logger: L, +} + +/// Represents the configuration of the LSPS2 service. +/// +/// See [bLIP-52 / LSPS2] for more information. +/// +/// [bLIP-52 / LSPS2]: https://github.com/lightning/blips/blob/master/blip-0052.md +#[derive(Debug, Clone)] +#[cfg_attr(feature = "uniffi", derive(uniffi::Record))] +pub struct LSPS2ServiceConfig { + /// A token we may require to be sent by the clients. + /// + /// If set, only requests matching this token will be accepted. + pub require_token: Option, + /// Indicates whether the LSPS service will be announced via the gossip network. + pub advertise_service: bool, + /// The fee we withhold for the channel open from the initial payment. + /// + /// This fee is proportional to the client-requested amount, in parts-per-million. + pub channel_opening_fee_ppm: u32, + /// The proportional overprovisioning for the channel. + /// + /// This determines, in parts-per-million, how much value we'll provision on top of the amount + /// we need to forward the payment to the client. + /// + /// For example, setting this to `100_000` will result in a channel being opened that is 10% + /// larger than then the to-be-forwarded amount (i.e., client-requested amount minus the + /// channel opening fee fee). + pub channel_over_provisioning_ppm: u32, + /// The minimum fee required for opening a channel. + pub min_channel_opening_fee_msat: u64, + /// The minimum number of blocks after confirmation we promise to keep the channel open. + pub min_channel_lifetime: u32, + /// The maximum number of blocks that the client is allowed to set its `to_self_delay` parameter. + pub max_client_to_self_delay: u32, + /// The minimum payment size that we will accept when opening a channel. + pub min_payment_size_msat: u64, + /// The maximum payment size that we will accept when opening a channel. + pub max_payment_size_msat: u64, + /// Use the 'client-trusts-LSP' trust model. + /// + /// When set, the service will delay *broadcasting* the JIT channel's funding transaction until + /// the client claimed sufficient HTLC parts to pay for the channel open. + /// + /// Note this will render the flow incompatible with clients utilizing the 'LSP-trust-client' + /// trust model, i.e., in turn delay *claiming* any HTLCs until they see the funding + /// transaction in the mempool. + /// + /// Please refer to [`bLIP-52`] for more information. + /// + /// [`bLIP-52`]: https://github.com/lightning/blips/blob/master/blip-0052.md#trust-models + pub client_trusts_lsp: bool, +} + +impl LSPS2ServiceLiquiditySource +where + L::Target: LdkLogger, +{ + pub(crate) fn set_peer_manager(&self, peer_manager: Weak) { + *self.peer_manager.write().unwrap() = Some(peer_manager); + } + + pub(crate) fn liquidity_manager(&self) -> Arc { + Arc::clone(&self.liquidity_manager) + } + + pub(crate) fn lsps2_channel_needs_manual_broadcast( + &self, counterparty_node_id: PublicKey, user_channel_id: u128, + ) -> bool { + self.lsps2_service.as_ref().map_or(false, |lsps2_service| { + lsps2_service.service_config.client_trusts_lsp + && self + .liquidity_manager() + .lsps2_service_handler() + .and_then(|handler| { + handler + .channel_needs_manual_broadcast(user_channel_id, &counterparty_node_id) + .ok() + }) + .unwrap_or(false) + }) + } + + pub(crate) fn lsps2_store_funding_transaction( + &self, user_channel_id: u128, counterparty_node_id: PublicKey, funding_tx: Transaction, + ) { + if self.lsps2_service.as_ref().map_or(false, |svc| !svc.service_config.client_trusts_lsp) { + // Only necessary for client-trusts-LSP flow + return; + } + + let lsps2_service_handler = self.liquidity_manager.lsps2_service_handler(); + if let Some(handler) = lsps2_service_handler { + handler + .store_funding_transaction(user_channel_id, &counterparty_node_id, funding_tx) + .unwrap_or_else(|e| { + debug_assert!(false, "Failed to store funding transaction: {:?}", e); + log_error!(self.logger, "Failed to store funding transaction: {:?}", e); + }); + } else { + log_error!(self.logger, "LSPS2 service handler is not available."); + } + } + + pub(crate) fn lsps2_funding_tx_broadcast_safe( + &self, user_channel_id: u128, counterparty_node_id: PublicKey, + ) { + if self.lsps2_service.as_ref().map_or(false, |svc| !svc.service_config.client_trusts_lsp) { + // Only necessary for client-trusts-LSP flow + return; + } + + let lsps2_service_handler = self.liquidity_manager.lsps2_service_handler(); + if let Some(handler) = lsps2_service_handler { + handler + .set_funding_tx_broadcast_safe(user_channel_id, &counterparty_node_id) + .unwrap_or_else(|e| { + debug_assert!( + false, + "Failed to mark funding transaction safe to broadcast: {:?}", + e + ); + log_error!( + self.logger, + "Failed to mark funding transaction safe to broadcast: {:?}", + e + ); + }); + } else { + log_error!(self.logger, "LSPS2 service handler is not available."); + } + } + + pub(crate) async fn handle_channel_ready( + &self, user_channel_id: u128, channel_id: &ChannelId, counterparty_node_id: &PublicKey, + ) { + if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() { + if let Err(e) = lsps2_service_handler + .channel_ready(user_channel_id, channel_id, counterparty_node_id) + .await + { + log_error!( + self.logger, + "LSPS2 service failed to handle ChannelReady event: {:?}", + e + ); + } + } + } + + pub(crate) async fn handle_htlc_intercepted( + &self, intercept_scid: u64, intercept_id: InterceptId, expected_outbound_amount_msat: u64, + payment_hash: PaymentHash, + ) { + if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() { + if let Err(e) = lsps2_service_handler + .htlc_intercepted( + intercept_scid, + intercept_id, + expected_outbound_amount_msat, + payment_hash, + ) + .await + { + log_error!( + self.logger, + "LSPS2 service failed to handle HTLCIntercepted event: {:?}", + e + ); + } + } + } + + pub(crate) async fn handle_htlc_handling_failed(&self, failure_type: HTLCHandlingFailureType) { + if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() { + if let Err(e) = lsps2_service_handler.htlc_handling_failed(failure_type).await { + log_error!( + self.logger, + "LSPS2 service failed to handle HTLCHandlingFailed event: {:?}", + e + ); + } + } + } + + pub(crate) async fn handle_payment_forwarded( + &self, next_channel_id: Option, skimmed_fee_msat: u64, + ) { + if let Some(next_channel_id) = next_channel_id { + if let Some(lsps2_service_handler) = self.liquidity_manager.lsps2_service_handler() { + if let Err(e) = + lsps2_service_handler.payment_forwarded(next_channel_id, skimmed_fee_msat).await + { + log_error!( + self.logger, + "LSPS2 service failed to handle PaymentForwarded: {:?}", + e + ); + } + } + } + } + + pub(crate) async fn handle_next_event(&self, event: LSPS2ServiceEvent) { + match event { + LSPS2ServiceEvent::GetInfo { request_id, counterparty_node_id, token } => { + if let Some(lsps2_service_handler) = + self.liquidity_manager.lsps2_service_handler().as_ref() + { + let service_config = if let Some(service_config) = + self.lsps2_service.as_ref().map(|s| s.service_config.clone()) + { + service_config + } else { + log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",); + return; + }; + + if let Some(required) = service_config.require_token { + if token != Some(required) { + log_error!( + self.logger, + "Rejecting LSPS2 request {:?} from counterparty {} as the client provided an invalid token.", + request_id, + counterparty_node_id + ); + lsps2_service_handler.invalid_token_provided(&counterparty_node_id, request_id.clone()).unwrap_or_else(|e| { + debug_assert!(false, "Failed to reject LSPS2 request. This should never happen."); + log_error!( + self.logger, + "Failed to reject LSPS2 request {:?} from counterparty {} due to: {:?}. This should never happen.", + request_id, + counterparty_node_id, + e + ); + }); + return; + } + } + + let valid_until = LSPSDateTime(Utc::now() + LSPS2_GETINFO_REQUEST_EXPIRY); + let opening_fee_params = LSPS2RawOpeningFeeParams { + min_fee_msat: service_config.min_channel_opening_fee_msat, + proportional: service_config.channel_opening_fee_ppm, + valid_until, + min_lifetime: service_config.min_channel_lifetime, + max_client_to_self_delay: service_config.max_client_to_self_delay, + min_payment_size_msat: service_config.min_payment_size_msat, + max_payment_size_msat: service_config.max_payment_size_msat, + }; + + let opening_fee_params_menu = vec![opening_fee_params]; + + if let Err(e) = lsps2_service_handler.opening_fee_params_generated( + &counterparty_node_id, + request_id, + opening_fee_params_menu, + ) { + log_error!( + self.logger, + "Failed to handle generated opening fee params: {:?}", + e + ); + } + } else { + log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",); + return; + } + }, + LSPS2ServiceEvent::BuyRequest { + request_id, + counterparty_node_id, + opening_fee_params: _, + payment_size_msat, + } => { + if let Some(lsps2_service_handler) = + self.liquidity_manager.lsps2_service_handler().as_ref() + { + let service_config = if let Some(service_config) = + self.lsps2_service.as_ref().map(|s| s.service_config.clone()) + { + service_config + } else { + log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",); + return; + }; + + let user_channel_id: u128 = u128::from_ne_bytes( + self.keys_manager.get_secure_random_bytes()[..16].try_into().unwrap(), + ); + let intercept_scid = self.channel_manager.get_intercept_scid(); + + if let Some(payment_size_msat) = payment_size_msat { + // We already check this in `lightning-liquidity`, but better safe than + // sorry. + // + // TODO: We might want to eventually send back an error here, but we + // currently can't and have to trust `lightning-liquidity` is doing the + // right thing. + // + // TODO: Eventually we also might want to make sure that we have sufficient + // liquidity for the channel opening here. + if payment_size_msat > service_config.max_payment_size_msat + || payment_size_msat < service_config.min_payment_size_msat + { + log_error!( + self.logger, + "Rejecting to handle LSPS2 buy request {:?} from counterparty {} as the client requested an invalid payment size.", + request_id, + counterparty_node_id + ); + return; + } + } + + match lsps2_service_handler + .invoice_parameters_generated( + &counterparty_node_id, + request_id, + intercept_scid, + LSPS2_CHANNEL_CLTV_EXPIRY_DELTA, + service_config.client_trusts_lsp, + user_channel_id, + ) + .await + { + Ok(()) => {}, + Err(e) => { + log_error!( + self.logger, + "Failed to provide invoice parameters: {:?}", + e + ); + return; + }, + } + } else { + log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",); + return; + } + }, + LSPS2ServiceEvent::OpenChannel { + their_network_key, + amt_to_forward_msat, + opening_fee_msat: _, + user_channel_id, + intercept_scid: _, + } => { + if self.liquidity_manager.lsps2_service_handler().is_none() { + log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",); + return; + }; + + let service_config = if let Some(service_config) = + self.lsps2_service.as_ref().map(|s| s.service_config.clone()) + { + service_config + } else { + log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as LSPS2 liquidity service was not configured.",); + return; + }; + + let init_features = if let Some(Some(peer_manager)) = + self.peer_manager.read().unwrap().as_ref().map(|weak| weak.upgrade()) + { + // Fail if we're not connected to the prospective channel partner. + if let Some(peer) = peer_manager.peer_by_node_id(&their_network_key) { + peer.init_features + } else { + // TODO: We just silently fail here. Eventually we will need to remember + // the pending requests and regularly retry opening the channel until we + // succeed. + log_error!( + self.logger, + "Failed to open LSPS2 channel to {} due to peer not being not connected.", + their_network_key, + ); + return; + } + } else { + debug_assert!(false, "Failed to handle LSPS2ServiceEvent as peer manager isn't available. This should never happen.",); + log_error!(self.logger, "Failed to handle LSPS2ServiceEvent as peer manager isn't available. This should never happen.",); + return; + }; + + // Fail if we have insufficient onchain funds available. + let over_provisioning_msat = (amt_to_forward_msat + * service_config.channel_over_provisioning_ppm as u64) + / 1_000_000; + let channel_amount_sats = (amt_to_forward_msat + over_provisioning_msat) / 1000; + let cur_anchor_reserve_sats = + total_anchor_channels_reserve_sats(&self.channel_manager, &self.config); + let spendable_amount_sats = + self.wallet.get_spendable_amount_sats(cur_anchor_reserve_sats).unwrap_or(0); + let required_funds_sats = channel_amount_sats + + self.config.anchor_channels_config.as_ref().map_or(0, |c| { + if init_features.requires_anchors_zero_fee_htlc_tx() + && !c.trusted_peers_no_reserve.contains(&their_network_key) + { + c.per_channel_reserve_sats + } else { + 0 + } + }); + if spendable_amount_sats < required_funds_sats { + log_error!(self.logger, + "Unable to create channel due to insufficient funds. Available: {}sats, Required: {}sats", + spendable_amount_sats, channel_amount_sats + ); + // TODO: We just silently fail here. Eventually we will need to remember + // the pending requests and regularly retry opening the channel until we + // succeed. + return; + } + + let mut config = self.channel_manager.get_current_config().clone(); + + // We set these LSP-specific values during Node building, here we're making sure it's actually set. + debug_assert_eq!( + config + .channel_handshake_config + .max_inbound_htlc_value_in_flight_percent_of_channel, + 100 + ); + debug_assert!(config.accept_forwards_to_priv_channels); + + // We set the forwarding fee to 0 for now as we're getting paid by the channel fee. + // + // TODO: revisit this decision eventually. + config.channel_config.forwarding_fee_base_msat = 0; + config.channel_config.forwarding_fee_proportional_millionths = 0; + + match self.channel_manager.create_channel( + their_network_key, + channel_amount_sats, + 0, + user_channel_id, + None, + Some(config), + ) { + Ok(_) => {}, + Err(e) => { + // TODO: We just silently fail here. Eventually we will need to remember + // the pending requests and regularly retry opening the channel until we + // succeed. + log_error!( + self.logger, + "Failed to open LSPS2 channel to {}: {:?}", + their_network_key, + e + ); + return; + }, + } + }, + } + } +} diff --git a/src/liquidity/service/mod.rs b/src/liquidity/service/mod.rs new file mode 100644 index 000000000..5e3a3b183 --- /dev/null +++ b/src/liquidity/service/mod.rs @@ -0,0 +1,8 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +pub(crate) mod lsps2; diff --git a/src/payment/bolt11.rs b/src/payment/bolt11.rs index f2857e814..aa27956a8 100644 --- a/src/payment/bolt11.rs +++ b/src/payment/bolt11.rs @@ -154,47 +154,35 @@ impl Bolt11Payment { let liquidity_source = self.liquidity_source.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; - let (node_id, address) = - liquidity_source.get_lsps2_lsp_details().ok_or(Error::LiquiditySourceUnavailable)?; - - let peer_info = PeerInfo { node_id, address }; - - let con_node_id = peer_info.node_id; - let con_addr = peer_info.address.clone(); - let con_cm = Arc::clone(&self.connection_manager); - - // We need to use our main runtime here as a local runtime might not be around to poll - // connection futures going forward. - self.runtime.block_on(async move { - con_cm.connect_peer_if_necessary(con_node_id, con_addr).await - })?; - - log_info!(self.logger, "Connected to LSP {}@{}. ", peer_info.node_id, peer_info.address); - let liquidity_source = Arc::clone(&liquidity_source); - let (invoice, lsp_total_opening_fee, lsp_prop_opening_fee) = + let connection_manager = Arc::clone(&self.connection_manager); + let (invoice, lsp_total_opening_fee, lsp_prop_opening_fee, chosen_lsp) = self.runtime.block_on(async move { if let Some(amount_msat) = amount_msat { liquidity_source + .lsps2_client() .lsps2_receive_to_jit_channel( amount_msat, description, expiry_secs, max_total_lsp_fee_limit_msat, payment_hash, + &connection_manager, ) .await - .map(|(invoice, total_fee)| (invoice, Some(total_fee), None)) + .map(|(invoice, total_fee, lsp)| (invoice, Some(total_fee), None, lsp)) } else { liquidity_source + .lsps2_client() .lsps2_receive_variable_amount_to_jit_channel( description, expiry_secs, max_proportional_lsp_fee_limit_ppm_msat, payment_hash, + &connection_manager, ) .await - .map(|(invoice, prop_fee)| (invoice, None, Some(prop_fee))) + .map(|(invoice, prop_fee, lsp)| (invoice, None, Some(prop_fee), lsp)) } })?; @@ -225,7 +213,8 @@ impl Bolt11Payment { ); self.payment_store.insert(payment)?; - // Persist LSP peer to make sure we reconnect on restart. + // Persist the chosen LSP peer to make sure we reconnect on restart. + let peer_info = PeerInfo { node_id: chosen_lsp.node_id, address: chosen_lsp.address }; self.peer_store.add_peer(peer_info)?; Ok(invoice) diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 3fde52dc4..9532b4f67 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1718,7 +1718,7 @@ async fn do_lsps2_client_service_integration(client_trusts_lsp: bool) { let client_config = random_config(true); setup_builder!(client_builder, client_config.node_config); client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - client_builder.set_liquidity_source_lsps2(service_node_id, service_addr, None); + client_builder.add_lsp(service_node_id, service_addr, None); let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); client_node.start().unwrap(); @@ -2035,7 +2035,7 @@ async fn lsps2_client_trusts_lsp() { let client_config = random_config(true); setup_builder!(client_builder, client_config.node_config); client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - client_builder.set_liquidity_source_lsps2(service_node_id, service_addr.clone(), None); + client_builder.add_lsp(service_node_id, service_addr.clone(), None); let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); client_node.start().unwrap(); let client_node_id = client_node.node_id(); @@ -2210,7 +2210,7 @@ async fn lsps2_lsp_trusts_client_but_client_does_not_claim() { let client_config = random_config(true); setup_builder!(client_builder, client_config.node_config); client_builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); - client_builder.set_liquidity_source_lsps2(service_node_id, service_addr.clone(), None); + client_builder.add_lsp(service_node_id, service_addr.clone(), None); let client_node = client_builder.build(client_config.node_entropy.into()).unwrap(); client_node.start().unwrap();