diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index ff499d049d4..1308901d56f 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -48,13 +48,6 @@ use crate::prelude::*; use crate::sync::{Arc, Mutex}; use bitcoin::hashes::Hash; -fn get_latest_mon_update_id<'a, 'b, 'c>( - node: &Node<'a, 'b, 'c>, channel_id: ChannelId, -) -> (u64, u64) { - let monitor_id_state = node.chain_monitor.latest_monitor_update_id.lock().unwrap(); - monitor_id_state.get(&channel_id).unwrap().clone() -} - #[test] fn test_monitor_and_persister_update_fail() { // Test that if both updating the `ChannelMonitor` and persisting the updated diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index fd780da8d91..a0f6c31b5dd 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -50,8 +50,8 @@ use crate::ln::channel_state::{ OutboundHTLCDetails, OutboundHTLCStateDetails, }; use crate::ln::channelmanager::{ - self, ChannelReadyOrder, FundingConfirmedMessage, HTLCFailureMsg, HTLCSource, - OpenChannelMessage, PaymentClaimDetails, PendingHTLCInfo, PendingHTLCStatus, + self, ChannelReadyOrder, FundingConfirmedMessage, HTLCFailureMsg, HTLCPreviousHopData, + HTLCSource, OpenChannelMessage, PaymentClaimDetails, PendingHTLCInfo, PendingHTLCStatus, RAACommitmentOrder, SentHTLCId, BREAKDOWN_TIMEOUT, MAX_LOCAL_BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, }; @@ -85,6 +85,7 @@ use crate::util::errors::APIError; use crate::util::logger::{Logger, Record, WithContext}; use crate::util::scid_utils::{block_from_scid, scid_from_parts}; use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, Writeable, Writer}; +use crate::{impl_readable_for_vec, impl_writeable_for_vec}; use alloc::collections::{btree_map, BTreeMap}; @@ -217,7 +218,7 @@ enum InboundHTLCState { /// Used to rebuild `ChannelManager` HTLC state on restart. Previously the manager would track /// and persist all HTLC forwards and receives itself, but newer LDK versions avoid relying on /// its persistence and instead reconstruct state based on `Channel` and `ChannelMonitor` data. - update_add_htlc_opt: Option, + update_add_htlc: InboundUpdateAdd, }, /// Removed by us and a new commitment_signed was sent (if we were AwaitingRemoteRevoke when we /// created it we would have put it in the holding cell instead). When they next revoke_and_ack @@ -308,6 +309,47 @@ impl InboundHTLCState { } } +/// A field of `InboundHTLCState::Committed` containing the HTLC's `update_add_htlc` message. If +/// the HTLC is a forward and gets irrevocably committed to the outbound edge, we convert to +/// `InboundUpdateAdd::Forwarded`, thus pruning the onion and not persisting it on every +/// `ChannelManager` persist. +/// +/// Useful for reconstructing the pending HTLC set on startup. +#[derive(Debug, Clone)] +pub(super) enum InboundUpdateAdd { + /// The inbound committed HTLC's update_add_htlc message. + WithOnion { update_add_htlc: msgs::UpdateAddHTLC }, + /// This inbound HTLC is a forward that was irrevocably committed to the outbound edge, allowing + /// its onion to be pruned and no longer persisted. + Forwarded { + /// Useful if we need to fail or claim this HTLC backwards after restart, if it's missing in the + /// outbound edge. + hop_data: HTLCPreviousHopData, + /// Useful if we need to claim this HTLC backwards after a restart and it's missing in the + /// outbound edge, to generate an accurate [`Event::PaymentForwarded`]. + /// + /// [`Event::PaymentForwarded`]: crate::events::Event::PaymentForwarded + outbound_amt_msat: u64, + }, + /// This HTLC was received pre-LDK 0.3, before we started persisting the onion for inbound + /// committed HTLCs. + Legacy, +} + +impl_writeable_tlv_based_enum_upgradable!(InboundUpdateAdd, + (0, WithOnion) => { + (0, update_add_htlc, required), + }, + (2, Forwarded) => { + (0, hop_data, required), + (2, outbound_amt_msat, required), + }, + (4, Legacy) => {}, +); + +impl_writeable_for_vec!(&InboundUpdateAdd); +impl_readable_for_vec!(InboundUpdateAdd); + #[cfg_attr(test, derive(Debug))] struct InboundHTLCOutput { htlc_id: u64, @@ -1150,17 +1192,24 @@ pub enum UpdateFulfillCommitFetch { /// The return value of `monitor_updating_restored` pub(super) struct MonitorRestoreUpdates { pub raa: Option, + // A `CommitmentUpdate` to be sent to our channel peer. pub commitment_update: Option, pub commitment_order: RAACommitmentOrder, pub accepted_htlcs: Vec<(PendingHTLCInfo, u64)>, pub failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, pub finalized_claimed_htlcs: Vec<(HTLCSource, Option)>, + // Inbound update_adds that are now irrevocably committed to this channel and are ready for the + // onion to be processed in order to forward or receive the HTLC. pub pending_update_adds: Vec, pub funding_broadcastable: Option, pub channel_ready: Option, pub channel_ready_order: ChannelReadyOrder, pub announcement_sigs: Option, pub tx_signatures: Option, + // The sources of outbound HTLCs that were forwarded and irrevocably committed on this channel + // (the outbound edge), along with their outbound amounts. Useful to store in the inbound HTLC + // to ensure it gets resolved. + pub committed_outbound_htlc_sources: Vec<(HTLCPreviousHopData, u64)>, } /// The return value of `signer_maybe_unblocked` @@ -7779,19 +7828,69 @@ where } /// Useful for reconstructing the set of pending HTLCs when deserializing the `ChannelManager`. - pub(super) fn get_inbound_committed_update_adds(&self) -> Vec { + pub(super) fn inbound_committed_unresolved_htlcs( + &self, + ) -> Vec<(PaymentHash, InboundUpdateAdd)> { + // We don't want to return an HTLC as needing processing if it already has a resolution that's + // pending in the holding cell. + let htlc_resolution_in_holding_cell = |id: u64| -> bool { + self.context.holding_cell_htlc_updates.iter().any(|holding_cell_htlc| { + match holding_cell_htlc { + HTLCUpdateAwaitingACK::ClaimHTLC { htlc_id, .. } => *htlc_id == id, + HTLCUpdateAwaitingACK::FailHTLC { htlc_id, .. } => *htlc_id == id, + HTLCUpdateAwaitingACK::FailMalformedHTLC { htlc_id, .. } => *htlc_id == id, + HTLCUpdateAwaitingACK::AddHTLC { .. } => false, + } + }) + }; + self.context .pending_inbound_htlcs .iter() - .filter_map(|htlc| match htlc.state { - InboundHTLCState::Committed { ref update_add_htlc_opt } => { - update_add_htlc_opt.clone() + .filter_map(|htlc| match &htlc.state { + InboundHTLCState::Committed { update_add_htlc } => { + if htlc_resolution_in_holding_cell(htlc.htlc_id) { + return None; + } + Some((htlc.payment_hash, update_add_htlc.clone())) + }, + _ => None, + }) + .collect() + } + + /// Useful when reconstructing the set of pending HTLC forwards when deserializing the + /// `ChannelManager`. We don't want to cache an HTLC as needing to be forwarded if it's already + /// present in the outbound edge, or else we'll double-forward. + pub(super) fn holding_cell_outbound_htlc_forwards(&self) -> Vec { + self.context + .holding_cell_htlc_updates + .iter() + .filter_map(|htlc| match htlc { + HTLCUpdateAwaitingACK::AddHTLC { source, .. } => match source { + HTLCSource::PreviousHopData(prev_hop_data) => Some(prev_hop_data.clone()), + _ => None, }, _ => None, }) .collect() } + /// This inbound HTLC was irrevocably forwarded to the outbound edge, so we no longer need to + /// persist its onion. + pub(super) fn prune_inbound_htlc_onion( + &mut self, htlc_id: u64, hop_data: HTLCPreviousHopData, outbound_amt_msat: u64, + ) { + for htlc in self.context.pending_inbound_htlcs.iter_mut() { + if htlc.htlc_id == htlc_id { + if let InboundHTLCState::Committed { ref mut update_add_htlc } = htlc.state { + *update_add_htlc = InboundUpdateAdd::Forwarded { hop_data, outbound_amt_msat }; + return; + } + } + } + } + /// Marks an outbound HTLC which we have received update_fail/fulfill/malformed #[inline] fn mark_outbound_htlc_removed( @@ -8781,7 +8880,8 @@ where false }; if swap { - let mut state = InboundHTLCState::Committed { update_add_htlc_opt: None }; + let mut state = + InboundHTLCState::Committed { update_add_htlc: InboundUpdateAdd::Legacy }; mem::swap(&mut state, &mut htlc.state); if let InboundHTLCState::AwaitingRemoteRevokeToAnnounce(resolution) = state { @@ -8822,9 +8922,8 @@ where to_forward_infos.push((forward_info, htlc.htlc_id)); htlc.state = InboundHTLCState::Committed { // HTLCs will only be in state `InboundHTLCResolution::Resolved` if they were - // received on an old pre-0.0.123 version of LDK. In this case, the HTLC is - // required to be resolved prior to upgrading to 0.1+ per CHANGELOG.md. - update_add_htlc_opt: None, + // received on LDK 0.1-. + update_add_htlc: InboundUpdateAdd::Legacy, }; }, } @@ -8833,7 +8932,9 @@ where log_trace!(logger, " ...promoting inbound AwaitingAnnouncedRemoteRevoke {} to Committed", &htlc.payment_hash); pending_update_adds.push(update_add_htlc.clone()); htlc.state = InboundHTLCState::Committed { - update_add_htlc_opt: Some(update_add_htlc), + update_add_htlc: InboundUpdateAdd::WithOnion { + update_add_htlc, + }, }; }, } @@ -9501,6 +9602,14 @@ where mem::swap(&mut finalized_claimed_htlcs, &mut self.context.monitor_pending_finalized_fulfills); let mut pending_update_adds = Vec::new(); mem::swap(&mut pending_update_adds, &mut self.context.monitor_pending_update_adds); + let committed_outbound_htlc_sources = self.context.pending_outbound_htlcs.iter().filter_map(|htlc| { + if let &OutboundHTLCState::Committed = &htlc.state { + if let HTLCSource::PreviousHopData(prev_hop_data) = &htlc.source { + return Some((prev_hop_data.clone(), htlc.amount_msat)) + } + } + None + }).collect(); if self.context.channel_state.is_peer_disconnected() { self.context.monitor_pending_revoke_and_ack = false; @@ -9509,7 +9618,7 @@ where raa: None, commitment_update: None, commitment_order: RAACommitmentOrder::RevokeAndACKFirst, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, pending_update_adds, funding_broadcastable, channel_ready, announcement_sigs, tx_signatures: None, - channel_ready_order, + channel_ready_order, committed_outbound_htlc_sources }; } @@ -9540,7 +9649,7 @@ where MonitorRestoreUpdates { raa, commitment_update, commitment_order, accepted_htlcs, failed_htlcs, finalized_claimed_htlcs, pending_update_adds, funding_broadcastable, channel_ready, announcement_sigs, tx_signatures: None, - channel_ready_order, + channel_ready_order, committed_outbound_htlc_sources } } @@ -14724,7 +14833,7 @@ where } } let mut removed_htlc_attribution_data: Vec<&Option> = Vec::new(); - let mut inbound_committed_update_adds: Vec<&Option> = Vec::new(); + let mut inbound_committed_update_adds: Vec<&InboundUpdateAdd> = Vec::new(); (self.context.pending_inbound_htlcs.len() as u64 - dropped_inbound_htlcs).write(writer)?; for htlc in self.context.pending_inbound_htlcs.iter() { if let &InboundHTLCState::RemoteAnnounced(_) = &htlc.state { @@ -14744,9 +14853,9 @@ where 2u8.write(writer)?; htlc_resolution.write(writer)?; }, - &InboundHTLCState::Committed { ref update_add_htlc_opt } => { + &InboundHTLCState::Committed { ref update_add_htlc } => { 3u8.write(writer)?; - inbound_committed_update_adds.push(update_add_htlc_opt); + inbound_committed_update_adds.push(update_add_htlc); }, &InboundHTLCState::LocalRemoved(ref removal_reason) => { 4u8.write(writer)?; @@ -15214,7 +15323,7 @@ where }; InboundHTLCState::AwaitingAnnouncedRemoteRevoke(resolution) }, - 3 => InboundHTLCState::Committed { update_add_htlc_opt: None }, + 3 => InboundHTLCState::Committed { update_add_htlc: InboundUpdateAdd::Legacy }, 4 => { let reason = match ::read(reader)? { 0 => InboundHTLCRemovalReason::FailRelay(msgs::OnionErrorPacket { @@ -15520,7 +15629,7 @@ where let mut pending_outbound_held_htlc_flags_opt: Option>> = None; let mut holding_cell_held_htlc_flags_opt: Option>> = None; - let mut inbound_committed_update_adds_opt: Option>> = None; + let mut inbound_committed_update_adds_opt: Option> = None; let mut holding_cell_accountable: Option> = None; let mut pending_outbound_accountable: Option> = None; @@ -15701,8 +15810,8 @@ where if let Some(update_adds) = inbound_committed_update_adds_opt { let mut iter = update_adds.into_iter(); for htlc in pending_inbound_htlcs.iter_mut() { - if let InboundHTLCState::Committed { ref mut update_add_htlc_opt } = htlc.state { - *update_add_htlc_opt = iter.next().ok_or(DecodeError::InvalidValue)?; + if let InboundHTLCState::Committed { ref mut update_add_htlc } = htlc.state { + *update_add_htlc = iter.next().ok_or(DecodeError::InvalidValue)?; } } if iter.next().is_some() { @@ -16068,16 +16177,17 @@ mod tests { use crate::chain::BestBlock; use crate::ln::chan_utils::{self, commit_tx_fee_sat, ChannelTransactionParameters}; use crate::ln::channel::{ - AwaitingChannelReadyFlags, ChannelState, FundedChannel, HTLCCandidate, HTLCInitiator, - HTLCUpdateAwaitingACK, InboundHTLCOutput, InboundHTLCState, InboundV1Channel, - OutboundHTLCOutput, OutboundHTLCState, OutboundV1Channel, + AwaitingChannelReadyFlags, ChannelId, ChannelState, FundedChannel, HTLCCandidate, + HTLCInitiator, HTLCUpdateAwaitingACK, InboundHTLCOutput, InboundHTLCState, + InboundUpdateAdd, InboundV1Channel, OutboundHTLCOutput, OutboundHTLCState, + OutboundV1Channel, }; use crate::ln::channel::{ MAX_FUNDING_SATOSHIS_NO_WUMBO, MIN_THEIR_CHAN_RESERVE_SATOSHIS, TOTAL_BITCOIN_SUPPLY_SATOSHIS, }; use crate::ln::channel_keys::{RevocationBasepoint, RevocationKey}; - use crate::ln::channelmanager::{self, HTLCSource, PaymentId}; + use crate::ln::channelmanager::{self, HTLCPreviousHopData, HTLCSource, PaymentId}; use crate::ln::funding::FundingTxInput; use crate::ln::msgs; use crate::ln::msgs::{ChannelUpdate, UnsignedChannelUpdate, MAX_VALUE_MSAT}; @@ -16101,6 +16211,7 @@ mod tests { use bitcoin::amount::Amount; use bitcoin::constants::ChainHash; use bitcoin::hashes::sha256::Hash as Sha256; + use bitcoin::hashes::sha256d::Hash as Sha256d; use bitcoin::hashes::Hash; use bitcoin::hex::FromHex; use bitcoin::locktime::absolute::LockTime; @@ -16110,9 +16221,27 @@ mod tests { use bitcoin::secp256k1::{ecdsa::Signature, Secp256k1}; use bitcoin::secp256k1::{PublicKey, SecretKey}; use bitcoin::transaction::{Transaction, TxOut, Version}; + use bitcoin::Txid; use bitcoin::{ScriptBuf, WPubkeyHash, WitnessProgram, WitnessVersion}; use std::cmp; + fn dummy_prev_hop_data() -> HTLCPreviousHopData { + let txid_hash = Sha256d::from_bytes_ref(&[0; 32]); + HTLCPreviousHopData { + prev_outbound_scid_alias: 0, + user_channel_id: None, + htlc_id: 0, + incoming_packet_shared_secret: [0; 32], + phantom_shared_secret: None, + trampoline_shared_secret: None, + blinded_failure: None, + channel_id: ChannelId([0; 32]), + outpoint: OutPoint { txid: Txid::from_raw_hash(*txid_hash), index: 0 }, + counterparty_node_id: None, + cltv_expiry: None, + } + } + #[test] #[rustfmt::skip] fn test_channel_state_order() { @@ -16315,7 +16444,7 @@ mod tests { amount_msat: htlc_amount_msat, payment_hash: PaymentHash(Sha256::hash(&[42; 32]).to_byte_array()), cltv_expiry: 300000000, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { update_add_htlc: InboundUpdateAdd::Forwarded { hop_data: dummy_prev_hop_data(), outbound_amt_msat: 0 } }, }); node_a_chan.context.pending_outbound_htlcs.push(OutboundHTLCOutput { @@ -17164,7 +17293,12 @@ mod tests { amount_msat: 1000000, cltv_expiry: 500, payment_hash: PaymentHash::from(payment_preimage_0), - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }); let payment_preimage_1 = @@ -17174,7 +17308,12 @@ mod tests { amount_msat: 2000000, cltv_expiry: 501, payment_hash: PaymentHash::from(payment_preimage_1), - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }); let payment_preimage_2 = @@ -17216,7 +17355,12 @@ mod tests { amount_msat: 4000000, cltv_expiry: 504, payment_hash: PaymentHash::from(payment_preimage_4), - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }); // commitment tx with all five HTLCs untrimmed (minimum feerate) @@ -17605,7 +17749,12 @@ mod tests { amount_msat: 2000000, cltv_expiry: 501, payment_hash: PaymentHash::from(payment_preimage_1), - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }); chan.context.pending_outbound_htlcs.clear(); @@ -17858,7 +18007,12 @@ mod tests { amount_msat: 5000000, cltv_expiry: 920150, payment_hash: PaymentHash::from(htlc_in_preimage), - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, })); chan.context.pending_outbound_htlcs.extend( @@ -17922,7 +18076,12 @@ mod tests { amount_msat, cltv_expiry: 920150, payment_hash: PaymentHash::from(htlc_in_preimage), - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }, )); @@ -17989,7 +18148,12 @@ mod tests { amount_msat: 100000, cltv_expiry: 920125, payment_hash: htlc_0_in_hash, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }); let htlc_1_in_preimage = @@ -18007,7 +18171,12 @@ mod tests { amount_msat: 49900000, cltv_expiry: 920125, payment_hash: htlc_1_in_hash, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }); chan.context.pending_outbound_htlcs.extend( @@ -18060,7 +18229,12 @@ mod tests { amount_msat: 30000, payment_hash, cltv_expiry: 920125, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }, )); @@ -18102,7 +18276,12 @@ mod tests { amount_msat: 29525, payment_hash, cltv_expiry: 920125, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }, )); @@ -18140,7 +18319,12 @@ mod tests { amount_msat: 29525, payment_hash, cltv_expiry: 920125, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }, )); @@ -18178,7 +18362,12 @@ mod tests { amount_msat: 29753, payment_hash, cltv_expiry: 920125, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }, )); @@ -18231,7 +18420,12 @@ mod tests { amount_msat, cltv_expiry, payment_hash, - state: InboundHTLCState::Committed { update_add_htlc_opt: None }, + state: InboundHTLCState::Committed { + update_add_htlc: InboundUpdateAdd::Forwarded { + hop_data: dummy_prev_hop_data(), + outbound_amt_msat: 0, + }, + }, }), ); diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index fd5e5d15b9f..a034b6c5526 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -58,9 +58,9 @@ use crate::ln::chan_utils::selected_commitment_sat_per_1000_weight; use crate::ln::channel::QuiescentAction; use crate::ln::channel::{ self, hold_time_since, Channel, ChannelError, ChannelUpdateStatus, DisconnectResult, - FundedChannel, FundingTxSigned, InboundV1Channel, OutboundV1Channel, PendingV2Channel, - ReconnectionMsg, ShutdownResult, SpliceFundingFailed, StfuResponse, UpdateFulfillCommitFetch, - WithChannelContext, + FundedChannel, FundingTxSigned, InboundUpdateAdd, InboundV1Channel, OutboundV1Channel, + PendingV2Channel, ReconnectionMsg, ShutdownResult, SpliceFundingFailed, StfuResponse, + UpdateFulfillCommitFetch, WithChannelContext, }; use crate::ln::channel_state::ChannelDetails; use crate::ln::funding::SpliceContribution; @@ -1379,6 +1379,7 @@ enum PostMonitorUpdateChanResume { decode_update_add_htlcs: Option<(u64, Vec)>, finalized_claimed_htlcs: Vec<(HTLCSource, Option)>, failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, + committed_outbound_htlc_sources: Vec<(HTLCPreviousHopData, u64)>, }, } @@ -9508,6 +9509,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ decode_update_add_htlcs: Option<(u64, Vec)>, finalized_claimed_htlcs: Vec<(HTLCSource, Option)>, failed_htlcs: Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, + committed_outbound_htlc_sources: Vec<(HTLCPreviousHopData, u64)>, ) { // If the channel belongs to a batch funding transaction, the progress of the batch // should be updated as we have received funding_signed and persisted the monitor. @@ -9573,6 +9575,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }; self.fail_htlc_backwards_internal(&failure.0, &failure.1, &failure.2, receiver, None); } + self.prune_persisted_inbound_htlc_onions(committed_outbound_htlc_sources); } fn handle_monitor_update_completion_actions< @@ -10047,10 +10050,62 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ decode_update_add_htlcs, finalized_claimed_htlcs: updates.finalized_claimed_htlcs, failed_htlcs: updates.failed_htlcs, + committed_outbound_htlc_sources: updates.committed_outbound_htlc_sources, } } } + /// We store inbound committed HTLCs' onions in `Channel`s for use in reconstructing the pending + /// HTLC set on `ChannelManager` read. If an HTLC has been irrevocably forwarded to the outbound + /// edge, we no longer need to persist the inbound edge's onion and can prune it here. + fn prune_persisted_inbound_htlc_onions( + &self, committed_outbound_htlc_sources: Vec<(HTLCPreviousHopData, u64)>, + ) { + let per_peer_state = self.per_peer_state.read().unwrap(); + for (source, outbound_amt_msat) in committed_outbound_htlc_sources { + let counterparty_node_id = match source.counterparty_node_id.as_ref() { + Some(id) => id, + None => continue, + }; + let mut peer_state = + match per_peer_state.get(counterparty_node_id).map(|state| state.lock().unwrap()) { + Some(peer_state) => peer_state, + None => continue, + }; + + if let Some(chan) = + peer_state.channel_by_id.get_mut(&source.channel_id).and_then(|c| c.as_funded_mut()) + { + chan.prune_inbound_htlc_onion(source.htlc_id, source, outbound_amt_msat); + } + } + } + + #[cfg(test)] + pub(crate) fn test_holding_cell_outbound_htlc_forwards_count( + &self, cp_id: PublicKey, chan_id: ChannelId, + ) -> usize { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state = per_peer_state.get(&cp_id).map(|state| state.lock().unwrap()).unwrap(); + let chan = peer_state.channel_by_id.get(&chan_id).and_then(|c| c.as_funded()).unwrap(); + chan.holding_cell_outbound_htlc_forwards().len() + } + + #[cfg(test)] + /// Useful to check that we prune inbound HTLC onions once they are irrevocably forwarded to the + /// outbound edge, see [`Self::prune_persisted_inbound_htlc_onions`]. + pub(crate) fn test_get_inbound_committed_htlcs_with_onion( + &self, cp_id: PublicKey, chan_id: ChannelId, + ) -> usize { + let per_peer_state = self.per_peer_state.read().unwrap(); + let peer_state = per_peer_state.get(&cp_id).map(|state| state.lock().unwrap()).unwrap(); + let chan = peer_state.channel_by_id.get(&chan_id).and_then(|c| c.as_funded()).unwrap(); + chan.inbound_committed_unresolved_htlcs() + .iter() + .filter(|(_, htlc)| matches!(htlc, InboundUpdateAdd::WithOnion { .. })) + .count() + } + /// Completes channel resumption after locks have been released. /// /// Processes the [`PostMonitorUpdateChanResume`] returned by @@ -10076,6 +10131,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ decode_update_add_htlcs, finalized_claimed_htlcs, failed_htlcs, + committed_outbound_htlc_sources, } => { self.post_monitor_update_unlock( channel_id, @@ -10086,6 +10142,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ decode_update_add_htlcs, finalized_claimed_htlcs, failed_htlcs, + committed_outbound_htlc_sources, ); }, } @@ -16586,6 +16643,17 @@ pub fn provided_init_features(config: &UserConfig) -> InitFeatures { const SERIALIZATION_VERSION: u8 = 1; const MIN_SERIALIZATION_VERSION: u8 = 1; +// We plan to start writing this version in 0.5. +// +// LDK 0.5+ will reconstruct the set of pending HTLCs from `Channel{Monitor}` data that started +// being written in 0.3, ignoring legacy `ChannelManager` HTLC maps on read and not writing them. +// LDK 0.5+ will automatically fail to read if the pending HTLC set cannot be reconstructed, i.e. +// if we were last written with pending HTLCs on 0.2- or if the new 0.3+ fields are missing. +// +// If 0.3 or 0.4 reads this manager version, it knows that the legacy maps were not written and +// acts accordingly. +const RECONSTRUCT_HTLCS_FROM_CHANS_VERSION: u8 = 2; + impl_writeable_tlv_based!(PhantomRouteHints, { (2, channels, required_vec), (4, phantom_scid, required), @@ -17587,7 +17655,7 @@ where fn read( reader: &mut Reader, mut args: ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, MR, L>, ) -> Result { - let _ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); + let ver = read_ver_prefix!(reader, SERIALIZATION_VERSION); let chain_hash: ChainHash = Readable::read(reader)?; let best_block_height: u32 = Readable::read(reader)?; @@ -17874,23 +17942,24 @@ where } const MAX_ALLOC_SIZE: usize = 1024 * 64; - let forward_htlcs_count: u64 = Readable::read(reader)?; // Marked `_legacy` because in versions > 0.2 we are taking steps to remove the requirement of // regularly persisting the `ChannelManager` and instead rebuild the set of HTLC forwards from // `Channel{Monitor}` data. See `reconstruct_manager_from_monitors` usage below. - let mut forward_htlcs_legacy: HashMap> = - hash_map_with_capacity(cmp::min(forward_htlcs_count as usize, 128)); - for _ in 0..forward_htlcs_count { - let short_channel_id = Readable::read(reader)?; - let pending_forwards_count: u64 = Readable::read(reader)?; - let mut pending_forwards = Vec::with_capacity(cmp::min( - pending_forwards_count as usize, - MAX_ALLOC_SIZE / mem::size_of::(), - )); - for _ in 0..pending_forwards_count { - pending_forwards.push(Readable::read(reader)?); + let mut forward_htlcs_legacy: HashMap> = new_hash_map(); + if ver < RECONSTRUCT_HTLCS_FROM_CHANS_VERSION { + let forward_htlcs_count: u64 = Readable::read(reader)?; + for _ in 0..forward_htlcs_count { + let short_channel_id = Readable::read(reader)?; + let pending_forwards_count: u64 = Readable::read(reader)?; + let mut pending_forwards = Vec::with_capacity(cmp::min( + pending_forwards_count as usize, + MAX_ALLOC_SIZE / mem::size_of::(), + )); + for _ in 0..pending_forwards_count { + pending_forwards.push(Readable::read(reader)?); + } + forward_htlcs_legacy.insert(short_channel_id, pending_forwards); } - forward_htlcs_legacy.insert(short_channel_id, pending_forwards); } let claimable_htlcs_count: u64 = Readable::read(reader)?; @@ -18029,11 +18098,16 @@ where (19, peer_storage_dir, optional_vec), (21, async_receive_offer_cache, (default_value, async_receive_offer_cache)), }); + if (decode_update_add_htlcs_legacy.is_some() || pending_intercepted_htlcs_legacy.is_some()) + && ver >= RECONSTRUCT_HTLCS_FROM_CHANS_VERSION + { + return Err(DecodeError::InvalidValue); + } let mut decode_update_add_htlcs_legacy = decode_update_add_htlcs_legacy.unwrap_or_else(|| new_hash_map()); let mut pending_intercepted_htlcs_legacy = pending_intercepted_htlcs_legacy.unwrap_or_else(|| new_hash_map()); - let mut decode_update_add_htlcs = new_hash_map(); + let mut decode_update_add_htlcs: HashMap> = new_hash_map(); let peer_storage_dir: Vec<(PublicKey, Vec)> = peer_storage_dir.unwrap_or_else(Vec::new); if fake_scid_rand_bytes.is_none() { fake_scid_rand_bytes = Some(args.entropy_source.get_secure_random_bytes()); @@ -18328,7 +18402,7 @@ where // `reconstruct_manager_from_monitors` is set below. Currently it is only set in tests, randomly // to ensure the legacy codepaths also have test coverage. #[cfg(not(test))] - let reconstruct_manager_from_monitors = false; + let reconstruct_manager_from_monitors = ver >= RECONSTRUCT_HTLCS_FROM_CHANS_VERSION; #[cfg(test)] let reconstruct_manager_from_monitors = { use core::hash::{BuildHasher, Hasher}; @@ -18356,6 +18430,22 @@ where // have a fully-constructed `ChannelManager` at the end. let mut pending_claims_to_replay = Vec::new(); + // If we find an inbound HTLC that claims to already be forwarded to the outbound edge, we + // store an identifier for it here and verify that it is either (a) present in the outbound + // edge or (b) removed from the outbound edge via claim. If it's in neither of these states, we + // infer that it was removed from the outbound edge via fail, and fail it backwards to ensure + // that it is handled. + let mut already_forwarded_htlcs = Vec::new(); + let prune_forwarded_htlc = + |already_forwarded_htlcs: &mut Vec<(PaymentHash, HTLCPreviousHopData, u64)>, + prev_hop: &HTLCPreviousHopData| { + if let Some(idx) = already_forwarded_htlcs.iter().position(|(_, htlc, _)| { + prev_hop.htlc_id == htlc.htlc_id + && prev_hop.prev_outbound_scid_alias == htlc.prev_outbound_scid_alias + }) { + already_forwarded_htlcs.swap_remove(idx); + } + }; { // If we're tracking pending payments, ensure we haven't lost any by looking at the // ChannelMonitor data for any channels for which we do not have authorative state @@ -18378,16 +18468,38 @@ where if reconstruct_manager_from_monitors { if let Some(chan) = peer_state.channel_by_id.get(channel_id) { if let Some(funded_chan) = chan.as_funded() { + let scid_alias = funded_chan.context.outbound_scid_alias(); let inbound_committed_update_adds = - funded_chan.get_inbound_committed_update_adds(); - if !inbound_committed_update_adds.is_empty() { - // Reconstruct `ChannelManager::decode_update_add_htlcs` from the serialized - // `Channel`, as part of removing the requirement to regularly persist the - // `ChannelManager`. - decode_update_add_htlcs.insert( - funded_chan.context.outbound_scid_alias(), - inbound_committed_update_adds, - ); + funded_chan.inbound_committed_unresolved_htlcs(); + for (payment_hash, htlc) in inbound_committed_update_adds { + match htlc { + InboundUpdateAdd::WithOnion { update_add_htlc } => { + // Reconstruct `ChannelManager::decode_update_add_htlcs` from the serialized + // `Channel` as part of removing the requirement to regularly persist the + // `ChannelManager`. + match decode_update_add_htlcs.entry(scid_alias) { + hash_map::Entry::Occupied(mut entry) => { + entry.get_mut().push(update_add_htlc); + }, + hash_map::Entry::Vacant(entry) => { + entry.insert(vec![update_add_htlc]); + }, + } + }, + InboundUpdateAdd::Forwarded { + hop_data, + outbound_amt_msat, + } => { + already_forwarded_htlcs.push(( + payment_hash, + hop_data, + outbound_amt_msat, + )); + }, + InboundUpdateAdd::Legacy => { + return Err(DecodeError::InvalidValue) + }, + } } } } @@ -18430,6 +18542,21 @@ where let mut peer_state_lock = peer_state_mtx.lock().unwrap(); let peer_state = &mut *peer_state_lock; is_channel_closed = !peer_state.channel_by_id.contains_key(channel_id); + if reconstruct_manager_from_monitors { + if let Some(chan) = peer_state.channel_by_id.get(channel_id) { + if let Some(funded_chan) = chan.as_funded() { + for prev_hop in funded_chan.holding_cell_outbound_htlc_forwards() { + dedup_decode_update_add_htlcs( + &mut decode_update_add_htlcs, + &prev_hop, + "HTLC already forwarded to the outbound edge", + &args.logger, + ); + prune_forwarded_htlc(&mut already_forwarded_htlcs, &prev_hop); + } + } + } + } } for (htlc_source, (htlc, preimage_opt)) in monitor.get_all_current_outbound_htlcs() @@ -18454,6 +18581,7 @@ where "HTLC already forwarded to the outbound edge", &args.logger, ); + prune_forwarded_htlc(&mut already_forwarded_htlcs, &prev_hop_data); } if !is_channel_closed || reconstruct_manager_from_monitors { @@ -18977,6 +19105,7 @@ where "HTLC was failed backwards during manager read", &args.logger, ); + prune_forwarded_htlc(&mut already_forwarded_htlcs, prev_hop_data); } } @@ -19096,9 +19225,47 @@ where }; let mut processed_claims: HashSet> = new_hash_set(); - for (_, monitor) in args.channel_monitors.iter() { + for (channel_id, monitor) in args.channel_monitors.iter() { for (payment_hash, (payment_preimage, payment_claims)) in monitor.get_stored_preimages() { + // If we have unresolved inbound committed HTLCs that were already forwarded to the + // outbound edge and removed via claim, we need to make sure to claim them backwards via + // adding them to `pending_claims_to_replay`. + for (hash, hop_data, outbound_amt_msat) in + mem::take(&mut already_forwarded_htlcs).drain(..) + { + if hash != payment_hash { + already_forwarded_htlcs.push((hash, hop_data, outbound_amt_msat)); + continue; + } + let new_pending_claim = !pending_claims_to_replay.iter().any(|(src, _, _, _, _, _, _)| { + matches!(src, HTLCSource::PreviousHopData(hop) if hop.htlc_id == hop_data.htlc_id && hop.prev_outbound_scid_alias == hop_data.prev_outbound_scid_alias) + }); + if new_pending_claim { + let counterparty_node_id = monitor.get_counterparty_node_id(); + let is_channel_closed = channel_manager + .per_peer_state + .read() + .unwrap() + .get(&counterparty_node_id) + .map_or(true, |peer_state_mtx| { + !peer_state_mtx + .lock() + .unwrap() + .channel_by_id + .contains_key(channel_id) + }); + pending_claims_to_replay.push(( + HTLCSource::PreviousHopData(hop_data), + payment_preimage, + outbound_amt_msat, + is_channel_closed, + counterparty_node_id, + monitor.get_funding_txo(), + monitor.channel_id(), + )); + } + } if !payment_claims.is_empty() { for payment_claim in payment_claims { if processed_claims.contains(&payment_claim.mpp_parts) { @@ -19340,6 +19507,17 @@ where channel_manager .fail_htlc_backwards_internal(&source, &hash, &reason, receiver, ev_action); } + for (hash, htlc, _) in already_forwarded_htlcs { + let channel_id = htlc.channel_id; + let node_id = htlc.counterparty_node_id; + let source = HTLCSource::PreviousHopData(htlc); + let reason = + HTLCFailReason::reason(LocalHTLCFailureReason::TemporaryNodeFailure, Vec::new()); + let receiver = HTLCHandlingFailureType::Forward { node_id, channel_id }; + // The event completion action is only relevant for HTLCs that originate from our node, not + // forwarded HTLCs. + channel_manager.fail_htlc_backwards_internal(&source, &hash, &reason, receiver, None); + } for ( source, diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 1eda3bdf9f7..4474a18f0ac 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -1268,6 +1268,13 @@ pub fn check_added_monitors>(node: & } } +pub fn get_latest_mon_update_id<'a, 'b, 'c>( + node: &Node<'a, 'b, 'c>, channel_id: ChannelId, +) -> (u64, u64) { + let monitor_id_state = node.chain_monitor.latest_monitor_update_id.lock().unwrap(); + monitor_id_state.get(&channel_id).unwrap().clone() +} + fn claimed_htlc_matches_path<'a, 'b, 'c>( origin_node: &Node<'a, 'b, 'c>, path: &[&Node<'a, 'b, 'c>], htlc: &ClaimedHTLC, ) -> bool { @@ -5172,6 +5179,9 @@ pub struct ReconnectArgs<'a, 'b, 'c, 'd> { pub pending_cell_htlc_claims: (usize, usize), pub pending_cell_htlc_fails: (usize, usize), pub pending_raa: (bool, bool), + /// If true, don't assert that pending messages are empty after the commitment dance completes. + /// Useful when holding cell HTLCs will be released and need to be handled by the caller. + pub allow_post_commitment_dance_msgs: (bool, bool), } impl<'a, 'b, 'c, 'd> ReconnectArgs<'a, 'b, 'c, 'd> { @@ -5194,6 +5204,7 @@ impl<'a, 'b, 'c, 'd> ReconnectArgs<'a, 'b, 'c, 'd> { pending_cell_htlc_claims: (0, 0), pending_cell_htlc_fails: (0, 0), pending_raa: (false, false), + allow_post_commitment_dance_msgs: (false, false), } } } @@ -5219,6 +5230,7 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) { pending_raa, pending_responding_commitment_signed, pending_responding_commitment_signed_dup_monitor, + allow_post_commitment_dance_msgs, } = args; connect_nodes(node_a, node_b); let reestablish_1 = get_chan_reestablish_msgs!(node_a, node_b); @@ -5402,11 +5414,13 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) { get_event_msg!(node_a, MessageSendEvent::SendRevokeAndACK, node_b_id); // No commitment_signed so get_event_msg's assert(len == 1) passes node_b.node.handle_revoke_and_ack(node_a_id, &as_revoke_and_ack); - assert!(node_b.node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors( &node_b, if pending_responding_commitment_signed_dup_monitor.0 { 0 } else { 1 }, ); + if !allow_post_commitment_dance_msgs.0 { + assert!(node_b.node.get_and_clear_pending_msg_events().is_empty()); + } } } else { assert!(chan_msgs.2.is_none()); @@ -5516,11 +5530,13 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) { get_event_msg!(node_b, MessageSendEvent::SendRevokeAndACK, node_a_id); // No commitment_signed so get_event_msg's assert(len == 1) passes node_a.node.handle_revoke_and_ack(node_b_id, &bs_revoke_and_ack); - assert!(node_a.node.get_and_clear_pending_msg_events().is_empty()); check_added_monitors( &node_a, if pending_responding_commitment_signed_dup_monitor.1 { 0 } else { 1 }, ); + if !allow_post_commitment_dance_msgs.1 { + assert!(node_a.node.get_and_clear_pending_msg_events().is_empty()); + } } } else { assert!(chan_msgs.2.is_none()); diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index 4fb2753b6be..c1a62343d26 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -1210,6 +1210,13 @@ fn do_manager_persisted_pre_outbound_edge_forward(intercept_htlc: bool) { let updates = get_htlc_update_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); nodes[1].node.handle_update_add_htlc(nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]); do_commitment_signed_dance(&nodes[1], &nodes[0], &updates.commitment_signed, false, false); + // While an inbound HTLC is committed in a channel but not yet forwarded, we store its onion in + // the `Channel` in case we need to remember it on restart. Once it's irrevocably forwarded to the + // outbound edge, we can prune it on the inbound edge. + assert_eq!( + nodes[1].node.test_get_inbound_committed_htlcs_with_onion(nodes[0].node.get_our_node_id(), chan_id_1), + 1 + ); // Decode the HTLC onion but don't forward it to the next hop, such that the HTLC ends up in // `ChannelManager::forward_htlcs` or `ChannelManager::pending_intercepted_htlcs`. @@ -1231,6 +1238,13 @@ fn do_manager_persisted_pre_outbound_edge_forward(intercept_htlc: bool) { args_b_c.send_announcement_sigs = (true, true); reconnect_nodes(args_b_c); + // Before an inbound HTLC is irrevocably forwarded, its onion should still be persisted within the + // inbound edge channel. + assert_eq!( + nodes[1].node.test_get_inbound_committed_htlcs_with_onion(nodes[0].node.get_our_node_id(), chan_id_1), + 1 + ); + // Forward the HTLC and ensure we can claim it post-reload. nodes[1].node.process_pending_htlc_forwards(); @@ -1253,6 +1267,12 @@ fn do_manager_persisted_pre_outbound_edge_forward(intercept_htlc: bool) { nodes[2].node.handle_update_add_htlc(nodes[1].node.get_our_node_id(), &updates.update_add_htlcs[0]); do_commitment_signed_dance(&nodes[2], &nodes[1], &updates.commitment_signed, false, false); expect_and_process_pending_htlcs(&nodes[2], false); + // After an inbound HTLC is irrevocably forwarded, its onion should be pruned within the inbound + // edge channel. + assert_eq!( + nodes[1].node.test_get_inbound_committed_htlcs_with_onion(nodes[0].node.get_our_node_id(), chan_id_1), + 0 + ); expect_payment_claimable!(nodes[2], payment_hash, payment_secret, amt_msat, None, nodes[2].node.get_our_node_id()); let path: &[&[_]] = &[&[&nodes[1], &nodes[2]]]; @@ -1318,6 +1338,117 @@ fn test_manager_persisted_post_outbound_edge_forward() { expect_payment_sent(&nodes[0], payment_preimage, None, true, true); } +#[test] +fn test_manager_persisted_post_outbound_edge_holding_cell() { + // Test that we will not double-forward an HTLC after restart if it is already in the outbound + // edge's holding cell, which was previously broken. + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let persister; + let new_chain_monitor; + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes_1_deserialized; + let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + let chan_id_1 = create_announced_chan_between_nodes(&nodes, 0, 1).2; + let chan_id_2 = create_announced_chan_between_nodes(&nodes, 1, 2).2; + send_payment(&nodes[0], &[&nodes[1], &nodes[2]], 5000000); + + // Lock in the HTLC from node_a <> node_b. + let amt_msat = 1000; + let (route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[2], amt_msat); + nodes[0].node.send_payment_with_route(route, payment_hash, RecipientOnionFields::secret_only(payment_secret), PaymentId(payment_hash.0)).unwrap(); + check_added_monitors(&nodes[0], 1); + let updates = get_htlc_update_msgs(&nodes[0], &nodes[1].node.get_our_node_id()); + nodes[1].node.handle_update_add_htlc(nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]); + do_commitment_signed_dance(&nodes[1], &nodes[0], &updates.commitment_signed, false, false); + + // Send a 2nd HTLC node_c -> node_b, to force the first HTLC into the holding cell. + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress); + let (route_2, payment_hash_2, payment_preimage_2, payment_secret_2) = get_route_and_payment_hash!(nodes[2], nodes[1], amt_msat); + nodes[2].node.send_payment_with_route(route_2, payment_hash_2, RecipientOnionFields::secret_only(payment_secret_2), PaymentId(payment_hash_2.0)).unwrap(); + let send_event = + SendEvent::from_event(nodes[2].node.get_and_clear_pending_msg_events().remove(0)); + nodes[1].node.handle_update_add_htlc(nodes[2].node.get_our_node_id(), &send_event.msgs[0]); + nodes[1].node.handle_commitment_signed_batch_test(nodes[2].node.get_our_node_id(), &send_event.commitment_msg); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + check_added_monitors(&nodes[1], 1); + assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty()); + + // Add the HTLC to the outbound edge, node_b <> node_c. Force the outbound HTLC into the b<>c + // holding cell. + nodes[1].node.process_pending_htlc_forwards(); + check_added_monitors(&nodes[1], 0); + assert_eq!( + nodes[1].node.test_holding_cell_outbound_htlc_forwards_count(nodes[2].node.get_our_node_id(), chan_id_2), + 1 + ); + + // Disconnect peers and reload the forwarding node_b. + nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id()); + nodes[2].node.peer_disconnected(nodes[1].node.get_our_node_id()); + + let node_b_encoded = nodes[1].node.encode(); + let chan_0_monitor_serialized = get_monitor!(nodes[1], chan_id_1).encode(); + let chan_1_monitor_serialized = get_monitor!(nodes[1], chan_id_2).encode(); + reload_node!(nodes[1], node_b_encoded, &[&chan_0_monitor_serialized, &chan_1_monitor_serialized], persister, new_chain_monitor, nodes_1_deserialized); + + chanmon_cfgs[1].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed); + let (latest_update, _) = get_latest_mon_update_id(&nodes[1], chan_id_2); + nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(chan_id_2, latest_update); + + reconnect_nodes(ReconnectArgs::new(&nodes[1], &nodes[0])); + + // Reconnect b<>c. Node_b has pending RAA + commitment_signed from the incomplete c->b + // commitment dance, plus an HTLC in the holding cell that will be released after the dance. + let mut reconnect_args = ReconnectArgs::new(&nodes[1], &nodes[2]); + reconnect_args.pending_raa = (false, true); + reconnect_args.pending_responding_commitment_signed = (false, true); + // Node_c needs a monitor update to catch up after processing node_b's reestablish. + reconnect_args.expect_renegotiated_funding_locked_monitor_update = (false, true); + // The holding cell HTLC will be released after the commitment dance - handle it below. + reconnect_args.allow_post_commitment_dance_msgs = (false, true); + reconnect_nodes(reconnect_args); + + // The holding cell HTLC was released during the reconnect. Complete its commitment dance. + let holding_cell_htlc_msgs = nodes[1].node.get_and_clear_pending_msg_events(); + assert_eq!(holding_cell_htlc_msgs.len(), 1); + match &holding_cell_htlc_msgs[0] { + MessageSendEvent::UpdateHTLCs { node_id, updates, .. } => { + assert_eq!(*node_id, nodes[2].node.get_our_node_id()); + assert_eq!(updates.update_add_htlcs.len(), 1); + nodes[2].node.handle_update_add_htlc(nodes[1].node.get_our_node_id(), &updates.update_add_htlcs[0]); + do_commitment_signed_dance(&nodes[2], &nodes[1], &updates.commitment_signed, false, false); + } + _ => panic!("Unexpected message: {:?}", holding_cell_htlc_msgs[0]), + } + + // Ensure node_b won't double-forward the outbound HTLC (this was previously broken). + nodes[1].node.process_pending_htlc_forwards(); + let msgs = nodes[1].node.get_and_clear_pending_msg_events(); + assert!(msgs.is_empty(), "Expected 0 messages, got {:?}", msgs); + + // The a->b->c HTLC is now committed on node_c. The c->b HTLC is committed on node_b. + // Both payments should now be claimable. + expect_and_process_pending_htlcs(&nodes[2], false); + expect_payment_claimable!(nodes[2], payment_hash, payment_secret, amt_msat, None, nodes[2].node.get_our_node_id()); + expect_payment_claimable!(nodes[1], payment_hash_2, payment_secret_2, amt_msat, None, nodes[1].node.get_our_node_id()); + + // Claim the a->b->c payment on node_c. + let path: &[&[_]] = &[&[&nodes[1], &nodes[2]]]; + do_claim_payment_along_route(ClaimAlongRouteArgs::new(&nodes[0], path, payment_preimage)); + expect_payment_sent(&nodes[0], payment_preimage, None, true, true); + + // Claim the c->b payment on node_b. + nodes[1].node.claim_funds(payment_preimage_2); + expect_payment_claimed!(nodes[1], payment_hash_2, amt_msat); + check_added_monitors(&nodes[1], 1); + let mut update = get_htlc_update_msgs(&nodes[1], &nodes[2].node.get_our_node_id()); + nodes[2].node.handle_update_fulfill_htlc(nodes[1].node.get_our_node_id(), update.update_fulfill_htlcs.remove(0)); + do_commitment_signed_dance(&nodes[2], &nodes[1], &update.commitment_signed, false, false); + expect_payment_sent(&nodes[2], payment_preimage_2, None, true, true); +} + #[test] fn test_reload_partial_funding_batch() { let chanmon_cfgs = create_chanmon_cfgs(3); @@ -1565,3 +1696,84 @@ fn test_peer_storage() { assert!(res.is_err()); } +#[test] +fn outbound_removed_holding_cell_resolved_no_double_forward() { + // Test that if a forwarding node has an HTLC that is fully removed on the outbound edge + // but where the inbound edge resolution is in the holding cell, and we reload the node in this + // state, that node will not double-forward the HTLC. + + let chanmon_cfgs = create_chanmon_cfgs(3); + let node_cfgs = create_node_cfgs(3, &chanmon_cfgs); + let persister; + let new_chain_monitor; + let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]); + let nodes_1_deserialized; + let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); + + let node_0_id = nodes[0].node.get_our_node_id(); + let node_1_id = nodes[1].node.get_our_node_id(); + let node_2_id = nodes[2].node.get_our_node_id(); + + let chan_0_1 = create_announced_chan_between_nodes(&nodes, 0, 1); + let chan_1_2 = create_announced_chan_between_nodes(&nodes, 1, 2); + + let chan_id_0_1 = chan_0_1.2; + let chan_id_1_2 = chan_1_2.2; + + // Send a payment from nodes[0] to nodes[2] via nodes[1]. + let (route, payment_hash, payment_preimage, payment_secret) = + get_route_and_payment_hash!(nodes[0], nodes[2], 1_000_000); + send_along_route_with_secret( + &nodes[0], route, &[&[&nodes[1], &nodes[2]]], 1_000_000, payment_hash, payment_secret, + ); + + // Claim the payment on nodes[2]. + nodes[2].node.claim_funds(payment_preimage); + check_added_monitors(&nodes[2], 1); + expect_payment_claimed!(nodes[2], payment_hash, 1_000_000); + + // Disconnect nodes[0] from nodes[1] BEFORE processing the fulfill. + // This forces the inbound fulfill resolution go to into nodes[1]'s holding cell for the inbound + // channel. + nodes[0].node.peer_disconnected(node_1_id); + nodes[1].node.peer_disconnected(node_0_id); + + // Process the fulfill from nodes[2] to nodes[1]. + let updates_2_1 = get_htlc_update_msgs(&nodes[2], &node_1_id); + nodes[1].node.handle_update_fulfill_htlc(node_2_id, updates_2_1.update_fulfill_htlcs[0].clone()); + check_added_monitors(&nodes[1], 1); + do_commitment_signed_dance(&nodes[1], &nodes[2], &updates_2_1.commitment_signed, false, false); + expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false); + + // At this point: + // - The outbound HTLC nodes[1]->nodes[2] is resolved and removed + // - The inbound HTLC nodes[0]->nodes[1] is still in a Committed state, with the fulfill + // resolution in nodes[1]'s chan_0_1 holding cell + let node_1_serialized = nodes[1].node.encode(); + let mon_0_1_serialized = get_monitor!(nodes[1], chan_id_0_1).encode(); + let mon_1_2_serialized = get_monitor!(nodes[1], chan_id_1_2).encode(); + + // Reload nodes[1]. + // During deserialization, we previously would have not noticed that the nodes[0]<>nodes[1] HTLC + // had a resolution pending in the holding cell, and reconstructed the ChannelManager's pending + // HTLC state indicating that the HTLC still needed to be forwarded to the outbound edge. + reload_node!( + nodes[1], + node_1_serialized, + &[&mon_0_1_serialized, &mon_1_2_serialized], + persister, + new_chain_monitor, + nodes_1_deserialized + ); + + // Check that nodes[1] doesn't double-forward the HTLC. + nodes[1].node.process_pending_htlc_forwards(); + + // Reconnect nodes[1] to nodes[0]. The claim should be in nodes[1]'s holding cell. + let mut reconnect_args = ReconnectArgs::new(&nodes[1], &nodes[0]); + reconnect_args.pending_cell_htlc_claims = (0, 1); + reconnect_nodes(reconnect_args); + + // nodes[0] should now have received the fulfill and generate PaymentSent. + expect_payment_sent(&nodes[0], payment_preimage, None, true, true); +} diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index f821aa5afc0..6579c0353a3 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -979,13 +979,15 @@ where } } -// Vectors +/// Write number of items in a vec followed by each element, without writing a length-prefix for +/// each element. +#[macro_export] macro_rules! impl_writeable_for_vec { ($ty: ty $(, $name: ident)*) => { impl<$($name : Writeable),*> Writeable for Vec<$ty> { #[inline] fn write(&self, w: &mut W) -> Result<(), io::Error> { - CollectionLength(self.len() as u64).write(w)?; + $crate::util::ser::CollectionLength(self.len() as u64).write(w)?; for elem in self.iter() { elem.write(w)?; } @@ -994,15 +996,21 @@ macro_rules! impl_writeable_for_vec { } } } +/// Read the number of items in a vec followed by each element, without reading a length prefix for +/// each element. +/// +/// Each element is read with `MaybeReadable`, meaning if an element cannot be read then it is +/// skipped without returning `DecodeError::InvalidValue`. +#[macro_export] macro_rules! impl_readable_for_vec { ($ty: ty $(, $name: ident)*) => { impl<$($name : Readable),*> Readable for Vec<$ty> { #[inline] - fn read(r: &mut R) -> Result { - let len: CollectionLength = Readable::read(r)?; - let mut ret = Vec::with_capacity(cmp::min(len.0 as usize, MAX_BUF_SIZE / core::mem::size_of::<$ty>())); + fn read(r: &mut R) -> Result { + let len: $crate::util::ser::CollectionLength = Readable::read(r)?; + let mut ret = Vec::with_capacity(cmp::min(len.0 as usize, $crate::util::ser::MAX_BUF_SIZE / core::mem::size_of::<$ty>())); for _ in 0..len.0 { - if let Some(val) = MaybeReadable::read(r)? { + if let Some(val) = $crate::util::ser::MaybeReadable::read(r)? { ret.push(val); } }