Defer ChainMonitor updates and persistence to flush()#4351
Defer ChainMonitor updates and persistence to flush()#4351joostjager wants to merge 4 commits intolightningdevkit:mainfrom
Conversation
|
👋 Thanks for assigning @valentinewallace as a reviewer! |
|
Closing this PR as #4345 seems to be the easiest way to go |
1f5cef4 to
30d05ca
Compare
|
The single commit was split into three: extracting internal methods, adding a deferred toggle, and implementing the deferral and flushing logic. flush() now delegates to the extracted internal methods rather than reimplementing persist/insert logic inline. Deferred mode is opt-in via a deferred bool rather than always-on. Test infrastructure was expanded with deferred-mode helpers and dedicated unit tests. |
2815bf9 to
3eb5644
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4351 +/- ##
==========================================
+ Coverage 85.87% 85.91% +0.03%
==========================================
Files 157 157
Lines 103769 104076 +307
Branches 103769 104076 +307
==========================================
+ Hits 89115 89418 +303
- Misses 12158 12160 +2
- Partials 2496 2498 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
f964466 to
b140bf9
Compare
|
This PR is now ready for review. LDK-node counterpart: lightningdevkit/ldk-node#782 |
| Ok(ChannelMonitorUpdateStatus::Completed) | ||
| } | ||
|
|
||
| fn watch_channel_internal( |
There was a problem hiding this comment.
lol was it really faster to ask claude to do this than to just move the code? :p
There was a problem hiding this comment.
I don't like cursor keys anymore 😂
| pub fn new( | ||
| chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P, | ||
| _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, | ||
| _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, deferred: bool, |
There was a problem hiding this comment.
This needs comprehensive documentation (copied on the new_async_beta constructor as well).
lightning/src/chain/chainmonitor.rs
Outdated
| deferred: bool, | ||
| /// Queued monitor operations awaiting flush. Unused when `deferred` is `false`. | ||
| /// | ||
| /// # Locking order with `monitors` |
There was a problem hiding this comment.
IMO lock order constraints should be written in code and checked, not in english :)
There was a problem hiding this comment.
You mean remove as we did with the giant lock tree doc? Or is there some more explicit way to write it in code other than just locking? Have to say that the lock debugger indeed helped with getting it right.
| } | ||
| } | ||
|
|
||
| /// Returns the number of pending monitor operations queued for later execution. |
There was a problem hiding this comment.
Needs description of how to use it.
lightning/src/chain/chainmonitor.rs
Outdated
| self.pending_ops.lock().unwrap().len() | ||
| } | ||
|
|
||
| /// Flushes up to `count` pending monitor operations. |
There was a problem hiding this comment.
Needs description of how to use it, including reference to pending_operation_count.
lightning/src/chain/chainmonitor.rs
Outdated
| /// remaining operations are left in the queue for the next call. | ||
| pub fn flush(&self, count: usize, logger: &L) -> Result<(), ()> { | ||
| if count > 0 { | ||
| log_trace!(logger, "Flushing up to {} monitor operations", count); |
There was a problem hiding this comment.
More than trace imo, maybe info even?
There was a problem hiding this comment.
Made it info, probably good to see that things are eventually persisted.
lightning/src/chain/chainmonitor.rs
Outdated
| }, | ||
| Err(()) => { | ||
| drop(queue); | ||
| // The monitor is consumed and cannot be retried. |
There was a problem hiding this comment.
should be unreachable, right?
There was a problem hiding this comment.
yes. want to panic?
in the original code, this did return an error to the caller of watch_channel
There was a problem hiding this comment.
This is indeed already checked before queueing. Made it unreachable. Could also remove the flush return value.
lightning/src/chain/chainmonitor.rs
Outdated
| }, | ||
| ChannelMonitorUpdateStatus::InProgress => {}, | ||
| ChannelMonitorUpdateStatus::UnrecoverableError => { | ||
| panic!("UnrecoverableError during monitor operation"); |
There was a problem hiding this comment.
This should be unreachable, right?
There was a problem hiding this comment.
This one is indeed truly unreachable. Made it unreachable.
|
👋 The first review has been submitted! Do you think this PR is ready for a second reviewer? If so, click here to assign a second reviewer. |
b140bf9 to
04051fb
Compare
|
🔔 1st Reminder Hey @TheBlueMatt! This PR has been waiting for your review. |
|
Never mind, we can hold off completion signalling, but that doesn't prevent monitors from being on disk before chan manager is. |
|
I think there was still a small race remaining. Pushed fix. |
|
🔔 1st Reminder Hey @TheBlueMatt! This PR has been waiting for your review. |
TheBlueMatt
left a comment
There was a problem hiding this comment.
feel free to squash
|
|
||
| let mut futures = Joiner::new(); | ||
|
|
||
| // We capture pending_operation_count inside the persistence branch to |
There was a problem hiding this comment.
Not entirely sure this needs to be explained lol.
There was a problem hiding this comment.
I think so, because I hadn't thought of that tiny gap. I thought it's all wrapped in the persistence guard, but the needs_persist atomic is separate.
There was a problem hiding this comment.
I did rephrase the comment a bit.
lightning/src/chain/chainmonitor.rs
Outdated
| self.pending_ops.lock().unwrap().len() | ||
| } | ||
|
|
||
| /// Flushes up to `count` pending monitor operations that were queued while the |
There was a problem hiding this comment.
nit: its ordered
| /// Flushes up to `count` pending monitor operations that were queued while the | |
| /// Flushes the first `count` pending monitor operations that were queued while the |
7ce7cf9 to
824e976
Compare
|
✅ Added second reviewer: @valentinewallace |
824e976 to
47070bd
Compare
|
Rebased without changes |
|
|
||
| // Flush monitor operations that were pending before we persisted. New updates | ||
| // that arrived after are left for the next iteration. | ||
| chain_monitor.get_cm().flush(pending_monitor_writes, &logger); |
There was a problem hiding this comment.
I believe we currently won't flush if any of the futures error, e.g. persisting the liquidity manager, which may not be the desired behavior.
There was a problem hiding this comment.
The flush not happening on error here isn't a channel safety concern, since no messages have gone out yet based on these deferred writes, and the channel manager being ahead of monitors doesn't lead to force closes.
I'd prefer to avoid adding complexity for inspecting individual futures after a failure just to flush when the channel manager persistence future succeeded.
lightning/src/chain/chainmonitor.rs
Outdated
| let mut queue = self.pending_ops.lock().unwrap(); | ||
| let op = match queue.pop_front() { | ||
| Some(op) => op, | ||
| None => return, |
There was a problem hiding this comment.
debug_assert!(false) here?
lightning/src/chain/chainmonitor.rs
Outdated
| let update_id = monitor.get_latest_update_id(); | ||
| // Atomically check for duplicates in both the pending queue and the | ||
| // flushed monitor set. Lock order: `pending_ops` before `monitors` | ||
| // (see `pending_ops` field doc). |
There was a problem hiding this comment.
I think this is outdated
There was a problem hiding this comment.
Indeed, left over from previous documentation of the lock order. Removed.
lightning/src/chain/chainmonitor.rs
Outdated
| /// A pending operation queued for later execution when `ChainMonitor` is in deferred mode. | ||
| enum PendingMonitorOp<ChannelSigner: EcdsaChannelSigner> { | ||
| /// A new monitor to insert and persist. | ||
| NewMonitor { channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>, update_id: u64 }, |
There was a problem hiding this comment.
Can't we omit the update_id field and do this?
diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs
index 6da88fe01..fc0e340be 100644
--- a/lightning/src/chain/chainmonitor.rs
+++ b/lightning/src/chain/chainmonitor.rs
@@ -70,7 +70,7 @@ use core::sync::atomic::{AtomicUsize, Ordering};
/// A pending operation queued for later execution when `ChainMonitor` is in deferred mode.
enum PendingMonitorOp<ChannelSigner: EcdsaChannelSigner> {
/// A new monitor to insert and persist.
- NewMonitor { channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner>, update_id: u64 },
+ NewMonitor { channel_id: ChannelId, monitor: ChannelMonitor<ChannelSigner> },
/// An update to apply and persist.
Update { channel_id: ChannelId, update: ChannelMonitorUpdate },
}
@@ -1282,8 +1282,9 @@ where
};
let (channel_id, update_id, status) = match op {
- PendingMonitorOp::NewMonitor { channel_id, monitor, update_id } => {
+ PendingMonitorOp::NewMonitor { channel_id, monitor } => {
let logger = WithChannelMonitor::from(logger, &monitor, None);
+ let update_id = monitor.get_latest_update_id();
log_trace!(logger, "Flushing new monitor");
// Hold `pending_ops` across the internal call so that
// `watch_channel` (which checks `monitors` + `pending_ops`
@@ -1546,7 +1547,6 @@ where
return self.watch_channel_internal(channel_id, monitor);
}
- let update_id = monitor.get_latest_update_id();
// Atomically check for duplicates in both the pending queue and the
// flushed monitor set. Lock order: `pending_ops` before `monitors`
// (see `pending_ops` field doc).
@@ -1562,7 +1562,7 @@ where
if already_pending {
return Err(());
}
- pending_ops.push_back(PendingMonitorOp::NewMonitor { channel_id, monitor, update_id });
+ pending_ops.push_back(PendingMonitorOp::NewMonitor { channel_id, monitor });
Ok(ChannelMonitorUpdateStatus::InProgress)
}
There was a problem hiding this comment.
Good one indeed, done
lightning/src/chain/chainmonitor.rs
Outdated
| if already_pending { | ||
| return Err(()); | ||
| } | ||
| pending_ops.push_back(PendingMonitorOp::NewMonitor { channel_id, monitor, update_id }); |
There was a problem hiding this comment.
Sometimes when we push to queues we have a helper method that checks if the queue's length is getting too long and take some action, like warn or something. Does that seem useful in this case?
There was a problem hiding this comment.
I don't think it is very useful. The queue is self-limiting: when deferred mode returns InProgress, MONITOR_UPDATE_IN_PROGRESS is set on the channel, which blocks further updates.
| }, | ||
| } | ||
| }, | ||
| PendingMonitorOp::Update { channel_id, update } => { |
There was a problem hiding this comment.
Just wanted to note that within the monitor we also have a pending_monitor_updates queue, so we have two layers of pending update queues now... Probably fine but seems a bit duplicative at first glance.
There was a problem hiding this comment.
These two queues serve different purposes: pending_ops holds operations that haven't been executed at all yet (deferred until after the ChannelManager is persisted), while pending_monitor_updates in the MonitorHolder tracks updates that have been applied to the in-memory monitor but whose async persistence hasn't completed yet. They operate at different stages of the pipeline: "not yet started" vs "started but not yet confirmed persisted."
| match status { | ||
| ChannelMonitorUpdateStatus::Completed => { | ||
| let logger = WithContext::from(logger, None, Some(channel_id), None); | ||
| if let Err(e) = self.channel_monitor_updated(channel_id, update_id) { |
There was a problem hiding this comment.
Relatedly, the Persist docs state that channel_monitor_updated should be called once a full channelmonitor has been persisted, which I think is inaccurate (and later docs seem to contradict that).
There was a problem hiding this comment.
Updated the doc to clarify this. Each pending update ID must be individually marked complete via channel_monitor_updated, since the implementation uses retain to remove only the exact matching ID.
| match status { | ||
| ChannelMonitorUpdateStatus::Completed => { | ||
| let logger = WithContext::from(logger, None, Some(channel_id), None); | ||
| if let Err(e) = self.channel_monitor_updated(channel_id, update_id) { |
There was a problem hiding this comment.
I'm confused, the Persist docs say it's only necessary to call channel_monitor_updated when ::InProgress was previously returned for an update. Have I told you about my theory we should remove sync persistence support? Lol
There was a problem hiding this comment.
The deferred watch_channel/update_channel always returns InProgress to the ChannelManager, so from its perspective there's a pending in-progress update. When flush() runs and the underlying persister returns Completed, we need to call channel_monitor_updated to signal that the previously-returned InProgress is now done, clearing MONITOR_UPDATE_IN_PROGRESS and releasing held messages.
There was a problem hiding this comment.
Removing sync persistence support: I'd be all in favor of that, but I have a suspicion it won't land this PR any faster ;)
lightning/src/chain/chainmonitor.rs
Outdated
| } | ||
|
|
||
| /// Creates a minimal `ChannelMonitor<TestChannelSigner>` for the given `channel_id`. | ||
| fn dummy_monitor( |
There was a problem hiding this comment.
This boilerplate already exists in channelmonitor.rs L6970
There was a problem hiding this comment.
Added commit to extract helper
| }; | ||
|
|
||
| match status { | ||
| ChannelMonitorUpdateStatus::Completed => { |
There was a problem hiding this comment.
I seem to recall we started disallowing flipping between returning Completed and InProgress at some point. I'm not sure if that's still correct, but it may be worth looking into.
There was a problem hiding this comment.
The flipping concern doesn't apply here. The InProgress is returned by the deferred layer in ChainMonitor, not by the Persist implementation itself. The Persist impl can consistently return Completed; the deferred layer just wraps it with an initial InProgress to delay processing until after the ChannelManager is persisted. When flush() runs and the persister returns Completed, we call channel_monitor_updated to resolve the InProgress that the deferred layer returned earlier.
47070bd to
1ecd046
Compare
Extract the ChannelMonitor construction boilerplate from channelmonitor test functions into a reusable dummy_monitor helper in test_utils.rs, generic over the signer type. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Pure refactor: move the bodies of Watch::watch_channel and Watch::update_channel into methods on ChainMonitor, and have the Watch trait methods delegate to them. This prepares for adding deferred mode where the Watch methods will conditionally queue operations instead of executing them immediately. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a `deferred` parameter to `ChainMonitor::new` and `ChainMonitor::new_async_beta`. When set to true, the Watch trait methods (watch_channel and update_channel) will unimplemented!() for now. All existing callers pass false to preserve current behavior. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the unimplemented!() stubs with a full deferred write implementation. When ChainMonitor has deferred=true, Watch trait operations queue PendingMonitorOp entries instead of executing immediately. A new flush() method drains the queue and forwards operations to the internal watch/update methods, calling channel_monitor_updated on Completed status. The BackgroundProcessor is updated to capture pending_operation_count before persisting the ChannelManager, then flush that many writes afterward - ensuring monitor writes happen in the correct order relative to manager persistence. Key changes: - Add PendingMonitorOp enum and pending_ops queue to ChainMonitor - Implement flush() and pending_operation_count() public methods - Integrate flush calls in BackgroundProcessor (both sync and async) - Add TestChainMonitor::new_deferred, flush helpers, and auto-flush in release_pending_monitor_events for test compatibility - Add create_node_cfgs_deferred for deferred-mode test networks - Add unit tests for queue/flush mechanics and full payment flow Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1ecd046 to
c6b30c9
Compare
|
Review comments addressed: https://github.com/lightningdevkit/rust-lightning/compare/47070bd..c6b30c93e31d462e628070267ec02f20b72452f2 |
Summary
Modify
ChainMonitorinternally to queuewatch_channelandupdate_channeloperations, returningInProgressuntilflush()is called. This enables persistence of monitor updates afterChannelManagerpersistence, ensuring correct ordering where theChannelManagerstate is never ahead of the monitor state on restart. The new behavior is opt-in via adeferredswitch.Key changes:
ChainMonitorgains adeferredswitch to enable the new queuing behaviorInProgressflush()applies pending operations and persists monitorsChannelManagerpersistence, then flush after persistence completesPerformance Impact
Multi-channel, multi-node load testing (using ldk-server chaos branch) shows no measurable throughput difference between deferred and direct persistence modes.
This is likely because forwarding and payment processing are already effectively single-threaded: the background processor batches all forwards for the entire node in a single pass, so the deferral overhead doesn't add any meaningful bottleneck to an already serialized path.
For high-latency storage (e.g., remote databases), there is also currently no significant impact because channel manager persistence already blocks event handling in the background processor loop (test). If the loop were parallelized to process events concurrently with persistence, deferred writing would become comparatively slower since it moves the channel manager round trip into the critical path. However, deferred writing would also benefit from loop parallelization, and could be further optimized by batching the monitor and manager writes into a single round trip.
Alternative Designs Considered
Several approaches were explored to solve the monitor/manager persistence ordering problem:
1. Queue at KVStore level (#4310)
Introduces a
QueuedKVStoreSyncwrapper that queues all writes in memory, committing them in a single batch at chokepoints where data leaves the system (get_and_clear_pending_msg_events,get_and_clear_pending_events). This approach aims for true atomic multi-key writes but requires KVStore backends that support transactions (e.g., SQLite); filesystem backends cannot achieve full atomicity.Trade-offs: Most general solution but requires changes to persistence boundaries and cannot fully close the desync gap with filesystem storage.
2. Queue at Persister level (#4317)
Updates
MonitorUpdatingPersisterto queue persist operations in memory, with actual writes happening onflush(). Addsflush()to thePersisttrait andChainMonitor.Trade-offs: Only fixes the issue for
MonitorUpdatingPersister; customPersistimplementations remain vulnerable to the race condition.3. Queue at ChainMonitor wrapper level (#4345)
Introduces
DeferredChainMonitor, a wrapper aroundChainMonitorthat implements the queue in a separate wrapper layer. AllChainMonitortraits (Listen,Confirm,EventsProvider, etc.) are passed through, allowing drop-in replacement.Trade-offs: Requires re-implementing all trait pass-throughs on the wrapper. Keeps the core
ChainMonitorunchanged but adds an external layer of indirection.