PHOENIX-7863 Add replication consistency point guard to compaction#2489
PHOENIX-7863 Add replication consistency point guard to compaction#2489Himanshu-g81 wants to merge 13 commits into
Conversation
On standby clusters, compaction can prematurely drop delete markers newer than the replication consistency point, causing permanent stale data. This adds a guard that floors maxLookbackWindowStart to the minimum consistency point across all HA groups when replication replay is enabled. Enabled via phoenix.replication.compaction.guard.enabled (default true), active only when phoenix.replication.replay.enabled is also true. Falls back to retaining all delete markers if the consistency point is unavailable.
Move applyReplicationConsistencyGuard and adjustMaxLookbackWindowStart from CompactionScanner to ReplicationLogReplayService where the consistency point logic belongs. Relocate unit test to replication.reader package accordingly.
Now that the guard logic lives in the same class, public access is no longer needed. Tests are in the same package and can still access it.
Co-locate REPLICATION_COMPACTION_GUARD_ENABLED and its default with the other replication config constants in ReplicationLogReplayService, removing them from QueryServices/QueryServicesOptions.
- Remove incorrect @VisibleForTesting from applyReplicationConsistencyGuard (it is genuinely public, called from CompactionScanner) - Reduce adjustMaxLookbackWindowStart to package-private - Fix newHashMapWithExpectedSize(4) to 5 in both IT classes
Given this is a critical safety feature to prevent data desynchronization, when would it ever be disabled? |
There was a problem hiding this comment.
Pull request overview
This PR introduces a replication consistency-point “compaction guard” to prevent standby-cluster compactions from purging delete markers that replication replay has not yet caught up to, avoiding permanent stale data.
Changes:
- Add a new compaction guard configuration (
phoenix.replication.compaction.guard.enabled, defaulttrue) and guard logic inReplicationLogReplayServiceto floormaxLookbackWindowStartby the minimum replication consistency point. - Apply the guard from
CompactionScannerwhen replication replay is enabled (and the guard is enabled). - Add unit + integration coverage for the guard behavior when enabled/disabled and when consistency point retrieval fails.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java | Unit tests for the pure window-flooring adjustment logic. |
| phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java | IT coverage validating delete-marker retention/purge behavior with the guard enabled. |
| phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java | IT coverage ensuring normal compaction behavior when the guard is disabled. |
| phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java | Adds guard config/constants, guard application + testing hooks. |
| phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java | Wires the guard into compaction by adjusting maxLookbackWindowStart. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @VisibleForTesting | ||
| static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, | ||
| long consistencyPoint, String tableName, String columnFamilyName) { | ||
| long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); |
There was a problem hiding this comment.
You are changing the store-level maxLookbackWindowStart but the value actually used for retention is computed per row as the max of this and the ttlWindowStart of the Phoenix compactor's RowContext. When ttlWindowStart in the RowContext is greater than consistencyPoint the boundary snaps to ttlWindowStart and delete markers between consistencyPoint and ttlWindowStart get purged anyway, which I think you are trying to prevent.
The robot pointed me to this issue with this text:
"The guard is effective only when replay lag stays within the TTL window. Concretely: table TTL = 1h, replay lag = 2h ⇒ consistencyPoint = now-2h, guard sets store start to now-2h, but the row uses max(now-1h, now-2h) = now-1h, so markers in the [now-2h, now-1h] band are dropped. Tests happen to pass only because the test table has no TTL."
There was a problem hiding this comment.
Fixed, thanks!
| public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, | ||
| Configuration conf, String tableName, String columnFamilyName) { | ||
| try { | ||
| long consistencyPoint = getInstance(conf).getConsistencyPoint(); |
There was a problem hiding this comment.
This will iterate every HA group and in SYNC state each call hits the filesystem. That is at least an exists() + listStatus() RPC to the NameNode per HA group, executed once per store per compaction, synchronously, inside the CompactionScanner constructor.
The consistency point moves slowly relative to compaction frequency, so consider caching it with a short TTL of a few seconds and generally using the cached value, rather than recomputing on every scanner construction.
| * Configuration key for enabling/disabling the replication compaction guard | ||
| */ | ||
| public static final String REPLICATION_COMPACTION_GUARD_ENABLED = | ||
| "phoenix.replication.compaction.guard.enabled"; |
There was a problem hiding this comment.
I commented on this elsewhere, but Claude called this a "foot gun", lol.
I'd argue the flag is a foot-gun: setting
phoenix.replication.compaction.guard.enabled=falseon a standby silently re-introduces exactly the data-desync bug this PR wants to fix.
There was a problem hiding this comment.
There was a problem hiding this comment.
This is unnecessary configuration and an accident waiting to happen.
- Move guard enforcement from store-level to RowContext.setTTL() to fix TTL window bypass (Math.min cap after Math.max with ttlWindowStart) - Add Guava memoized cache (30s TTL) for consistency point to reduce NameNode RPCs during compaction bursts - Gate guard on major compactions only (no effect during flush/minor) - Make constructor private, replace mock injection with test factory (setConsistencyPointForTesting) - Demote per-compaction logging from INFO to DEBUG - Keep guard config flag for operational flexibility
Verify that resolveConsistencyPoint uses cached value from Guava memoized supplier and does not re-fetch on every call within TTL.
The idea was if we have any issue in consistency point calculation, etc and we need to disable this check specificailly to avoid issues with compaction temporarily (since max-look back anyway hold the delete markers upto 3 days, or we update that config and disable this one to remediate the issue). |
| } | ||
|
|
||
| private ReplicationLogReplayService(long fixedConsistencyPoint) { | ||
| this.cachedConsistencyPoint = () -> fixedConsistencyPoint; |
There was a problem hiding this comment.
Shouldn't the final field conf been initialized ?
| } | ||
|
|
||
| private ReplicationLogReplayService(Supplier<Long> supplier) { | ||
| this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(supplier, |
There was a problem hiding this comment.
Shouldn't the final field conf been initialized ?
|
When the cache is cold or expired, the delegate calls getConsistencyPoint() → private ReplicationRound lastRoundInSync; // line 106 — plain field, NOT volatile That field is written by the background replay thread (setLastRoundInSync at :328, and init at :242/:249) and has no volatile modifier and no synchronization (:438-448). This PR makes HBase compaction threads (the large/small compaction pools — commonly >1, so multiple CompactionScanner constructors run in parallel on one RS) call
|
There was a problem hiding this comment.
I highlighted a build break and consistency issue. Both must be fixed. In addition the tests still don't cover the TTL case that motivated the rewrite.
I asked the robot to suggest necessary test improvements:
Add back a unit test on computeRowMaxLookbackWithGuard(...) covering at least:
ttlWindowStart>maxLookbackWindowStart>consistencyPoint(TTL-dominated, guard caps)maxLookbackWindowStart>ttlWindowStart>consistencyPoint(lookback-dominated, guard caps)consistencyPoint> both (guard inactive)consistencyPoint= 0 (fallback)consistencyPoint=Long.MAX_VALUE(guard disabled at call site)
Add an IT case to CompactionReplicationGuardIT that creates the table with TTL = N (using HBase CF TTL via ALTER or Phoenix TTL property), sets consistencyPoint = beforeDeleteTime - 1, advances past TTL by M >> N, and asserts the delete marker is retained. Without this, the row-level fix is only theoretically tested.
|
|
||
| private static volatile ReplicationLogReplayService instance; | ||
|
|
||
| private final Configuration conf; |
There was a problem hiding this comment.
The two test-only constructors in ReplicationLogReplayService do not initialize the blank-final field conf. This will break the build.
ReplicationLogReplayService.java:116: error: variable conf might not have been initialized
ReplicationLogReplayService.java:121: error: variable conf might not have been initialized
|
|
||
| private ReplicationLogReplayService(final Configuration conf) { | ||
| this.conf = conf; | ||
| this.cachedConsistencyPoint = Suppliers.memoizeWithExpiration(() -> { |
There was a problem hiding this comment.
Suppliers.memoizeWithExpiration uses synchronized(this) to serialize loads, but that lock is on the supplier object. This establishes no happens-before relationship with the replay thread's writes to lastRoundInSync.
So you may end up advancing the consistency point too far back, or even getting a a null back from the supplier even after the replay thread has already set it, spuriously throwing an IOException. The net effect is sometimes a major compaction won't drop delete markers when it should.
Something needs to enforce the happens-before relationship. Perhaps volatile ReplicationRound lastRoundInSync , or consider AtomicReference<ReplicationRound>. Or redo this.
| * Configuration key for enabling/disabling the replication compaction guard | ||
| */ | ||
| public static final String REPLICATION_COMPACTION_GUARD_ENABLED = | ||
| "phoenix.replication.compaction.guard.enabled"; |
There was a problem hiding this comment.
This is unnecessary configuration and an accident waiting to happen.
apurtell
left a comment
There was a problem hiding this comment.
Looks like the substantiative issues have been addressed. There is one nit I'd like to see improved but this is looking good to me, so I have approved it.
| } | ||
|
|
||
| private ReplicationLogReplayService(long fixedConsistencyPoint) { | ||
| this.conf = null; |
There was a problem hiding this comment.
If we need a conf to vary our behavior, it should be passed in by a higher layer. If we need a conf because some API we call needs it, we should create it with new Configuration().
There was a problem hiding this comment.
The robot says
Any code path on the test singleton that touches conf will NPE:
start() reads conf.getBoolean(...) (line 155)
stop() reads conf.getBoolean(...) (line 190)
getReplicationGroups() calls HAGroupStoreManager.getInstance(conf) (line 295)
getReplicationLogReplay(...) calls ReplicationLogReplay.get(conf, ...) (line 299)
Today this is safe only because the test ITs and UTs go straight through cachedConsistencyPoint.get() and never call the lifecycle methods.
| } | ||
|
|
||
| private ReplicationLogReplayService(Supplier<Long> supplier) { | ||
| this.conf = null; |
There was a problem hiding this comment.
If we need a conf to vary our behavior, it should be passed in by a higher layer. If we need a conf because some API we call needs it, we should create it with new Configuration().
On standby clusters, compaction can prematurely purge delete markers that have timestamps newer than the replication consistency point. This causes permanent stale data because when replay eventually catches up, the delete markers that should have been applied are already gone.
This PR adds a compaction guard that floors
maxLookbackWindowStartto the minimum consistency point across all HA groups, ensuring delete markers newer than the consistency point are retained until replay has processed them.The guard is controlled by
phoenix.replication.compaction.guard.enabled(defaulttrue). It activates automatically whenphoenix.replication.replay.enabled=true— no additional configuration needed on standby clusters. The separate flag allows operators to disable the guard independently if needed without turning off replay entirely.