From 66b19a52a6832865df767ba8dcc31ca77bdf86ba Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 23 Jun 2026 17:19:43 +0800 Subject: [PATCH 1/4] Pipe: merge batched aligned chunks in scan parser --- .../scan/TsFileInsertionEventScanParser.java | 359 +++++++++++++----- .../resource/memory/PipeMemoryWeightUtil.java | 18 +- .../event/TsFileInsertionEventParserTest.java | 195 ++++++++++ 3 files changed, 464 insertions(+), 108 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 4c5cd75d4c13f..1251127379e4a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -106,8 +106,9 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { private final List timeChunkPageMemorySizeList = new ArrayList<>(); private final Map measurementIndexMap = new HashMap<>(); - private int lastIndex = -1; - private Chunk firstChunk4NextSequentialValueChunks; + private final List pendingAlignedChunkGroups = new ArrayList<>(); + private long pendingAlignedChunkSize; + private CachedAlignedValueChunk cachedAlignedValueChunk; private byte lastMarker = Byte.MIN_VALUE; @@ -588,14 +589,14 @@ private boolean putValueToColumns(final BatchData data, final Tablet tablet, fin private void moveToNextChunkReader() throws IOException, IllegalStateException, IllegalPathException { ChunkHeader chunkHeader; - long valueChunkSize = 0; - long valueChunkPageMemorySize = 0; - final List valueChunkList = new ArrayList<>(); currentMeasurements.clear(); modsInfos.clear(); if (lastMarker == MetaMarker.SEPARATOR) { - chunkReader = null; + if (!recordPendingAlignedChunk(lastMarker)) { + clearCachedAlignedChunkData(); + chunkReader = null; + } return; } @@ -603,8 +604,8 @@ private void moveToNextChunkReader() while ((marker = lastMarker != Byte.MIN_VALUE ? lastMarker - : Objects.nonNull(firstChunk4NextSequentialValueChunks) - ? toValueChunkMarker(firstChunk4NextSequentialValueChunks.getHeader()) + : Objects.nonNull(cachedAlignedValueChunk) + ? toValueChunkMarker(cachedAlignedValueChunk.chunk.getHeader()) : tsFileSequenceReader.readMarker()) != MetaMarker.SEPARATOR) { lastMarker = Byte.MIN_VALUE; @@ -658,9 +659,8 @@ private void moveToNextChunkReader() case MetaMarker.VALUE_CHUNK_HEADER: case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: { - Chunk chunk; - long currentValueChunkPageMemorySize = 0; - if (Objects.isNull(firstChunk4NextSequentialValueChunks)) { + CachedAlignedValueChunk valueChunk = cachedAlignedValueChunk; + if (Objects.isNull(valueChunk)) { final long currentChunkHeaderOffset = tsFileSequenceReader.position() - 1; chunkHeader = tsFileSequenceReader.readChunkHeader(marker); @@ -668,7 +668,7 @@ private void moveToNextChunkReader() break; } - // Increase value index + // Increase value index. final String measurementID = tabletStringInternPool.intern(chunkHeader.getMeasurementID()); final int valueIndex = @@ -676,86 +676,36 @@ private void moveToNextChunkReader() measurementID, (measurement, index) -> Objects.nonNull(index) ? index + 1 : 0); - // Emit when encountered non-sequential value chunk, or the chunk size exceeds - // certain value to avoid OOM - // Do not record or end current value chunks when there are empty chunks + // Do not record or end current value chunks when there are empty chunks. if (chunkHeader.getDataSize() == 0) { break; } - chunk = + final Chunk chunk = new Chunk( chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); - currentValueChunkPageMemorySize = calculateMaxPageMemorySize(chunk); - boolean needReturn = false; - final long timeChunkSize = - lastIndex >= 0 - ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed( - timeChunkList.get(lastIndex)) - : 0; - final long timeChunkPageMemorySize = - lastIndex >= 0 ? timeChunkPageMemorySizeList.get(lastIndex) : 0; - if (lastIndex >= 0) { - if (valueIndex != lastIndex) { - needReturn = recordAlignedChunk(valueChunkList, marker); - } else { - final long chunkSize = timeChunkSize + valueChunkSize; - final long pageMemorySize = timeChunkPageMemorySize + valueChunkPageMemorySize; - if (chunkSize + chunkHeader.getDataSize() - > allocatedMemoryBlockForChunk.getMemoryUsageInBytes() - || timeChunkPageMemorySize > 0 - && currentValueChunkPageMemorySize > 0 - && pageMemorySize + currentValueChunkPageMemorySize - > getPageDataMemoryLimitInBytes()) { - needReturn = recordAlignedChunk(valueChunkList, marker); - } - } - } - lastIndex = valueIndex; - if (needReturn) { - firstChunk4NextSequentialValueChunks = chunk; - return; - } - resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); - resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( - valueChunkList, currentValueChunkPageMemorySize); + valueChunk = + new CachedAlignedValueChunk( + valueIndex, + chunk, + chunkHeader.getDataSize(), + calculateMaxPageMemorySize(chunk)); } else { - chunk = firstChunk4NextSequentialValueChunks; - chunkHeader = chunk.getHeader(); - firstChunk4NextSequentialValueChunks = null; - currentValueChunkPageMemorySize = calculateMaxPageMemorySize(chunk); - resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(valueChunkList, chunkHeader); - resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( - valueChunkList, currentValueChunkPageMemorySize); + cachedAlignedValueChunk = null; } - valueChunkSize += chunkHeader.getDataSize(); - valueChunkPageMemorySize += currentValueChunkPageMemorySize; - valueChunkList.add(chunk); - final String measurementID = - tabletStringInternPool.intern(chunkHeader.getMeasurementID()); - currentMeasurements.add( - new MeasurementSchema( - measurementID, - chunkHeader.getDataType(), - chunkHeader.getEncodingType(), - chunkHeader.getCompressionType())); - modsInfos.addAll( - ModsOperationUtil.initializeMeasurementMods( - currentDevice, Collections.singletonList(measurementID), currentModifications)); + if (returnPendingAlignedChunkBeforeCaching(valueChunk)) { + return; + } + cacheAlignedValueChunk(valueChunk); break; } case MetaMarker.CHUNK_GROUP_HEADER: { - // Return before "currentDevice" changes - if (recordAlignedChunk(valueChunkList, marker)) { + // Return before "currentDevice" changes. + if (recordPendingAlignedChunk(marker)) { return; } - // Clear because the cached data will never be used in the next chunk group - lastIndex = -1; - timeChunkList.clear(); - isMultiPageList.clear(); - timeChunkPageMemorySizeList.clear(); - measurementIndexMap.clear(); + clearCachedAlignedChunkData(); final IDeviceID deviceID = tsFileSequenceReader.readChunkGroupHeader().getDeviceID(); currentDevice = treePattern.mayOverlapWithDevice(deviceID) ? deviceID : null; currentDeviceString = @@ -775,7 +725,8 @@ > getPageDataMemoryLimitInBytes()) { } lastMarker = marker; - if (!recordAlignedChunk(valueChunkList, marker)) { + if (!recordPendingAlignedChunk(marker)) { + clearCachedAlignedChunkData(); chunkReader = null; } } @@ -859,22 +810,38 @@ private boolean filterChunk( return false; } - private boolean recordAlignedChunk(final List valueChunkList, final byte marker) - throws IOException { - if (!valueChunkList.isEmpty()) { - final Chunk timeChunk = timeChunkList.get(lastIndex); + private boolean recordPendingAlignedChunk(final byte marker) throws IOException { + while (!pendingAlignedChunkGroups.isEmpty()) { + final PendingAlignedChunkGroup pendingAlignedChunkGroup = pendingAlignedChunkGroups.remove(0); + pendingAlignedChunkSize = + Math.max(0, pendingAlignedChunkSize - pendingAlignedChunkGroup.chunkSize); + + if (pendingAlignedChunkGroup.valueChunkList.isEmpty()) { + continue; + } + + final Chunk timeChunk = timeChunkList.get(pendingAlignedChunkGroup.timeChunkIndex); timeChunk.getData().rewind(); - currentIsMultiPage = isMultiPageList.get(lastIndex); + for (final Chunk valueChunk : pendingAlignedChunkGroup.valueChunkList) { + valueChunk.getData().rewind(); + } + + currentMeasurements.clear(); + currentMeasurements.addAll(pendingAlignedChunkGroup.measurements); + modsInfos.clear(); + modsInfos.addAll(pendingAlignedChunkGroup.modsInfos); + + currentIsMultiPage = isMultiPageList.get(pendingAlignedChunkGroup.timeChunkIndex); if (!currentIsMultiPage) { resizePageDataMemoryIfNeeded( AlignedSinglePageWholeChunkReader.calculatePageEstimatedMemoryUsageInBytes( - timeChunk, valueChunkList)); + timeChunk, pendingAlignedChunkGroup.valueChunkList)); } final List pageEstimatedMemoryUsageInBytesList = currentIsMultiPage ? AlignedSinglePageWholeChunkReader .calculatePageEstimatedMemoryUsageInBytesWithBatchDataList( - timeChunk, valueChunkList) + timeChunk, pendingAlignedChunkGroup.valueChunkList) : Collections.emptyList(); final long maxPageEstimatedMemoryUsageInBytes = pageEstimatedMemoryUsageInBytesList.isEmpty() @@ -884,42 +851,193 @@ private boolean recordAlignedChunk(final List valueChunkList, final byte chunkReader = currentIsMultiPage ? new MemoryControlledChunkReader( - new AlignedChunkReader(timeChunk, valueChunkList, filter), + new AlignedChunkReader( + timeChunk, pendingAlignedChunkGroup.valueChunkList, filter), pageEstimatedMemoryUsageInBytesList) - : new AlignedSinglePageWholeChunkReader(timeChunk, valueChunkList, null); + : new AlignedSinglePageWholeChunkReader( + timeChunk, pendingAlignedChunkGroup.valueChunkList, null); currentIsAligned = true; - lastMarker = marker; + if (marker != Byte.MIN_VALUE) { + lastMarker = marker; + } return true; } return false; } + private boolean shouldReturnPendingAlignedChunkBeforeCaching( + final CachedAlignedValueChunk valueChunk) throws IOException { + validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex); + + final PendingAlignedChunkGroup pendingAlignedChunkGroup = + findPendingAlignedChunkGroup(valueChunk.timeChunkIndex); + final boolean isFirstValueChunkInGroup = + Objects.isNull(pendingAlignedChunkGroup) + || pendingAlignedChunkGroup.valueChunkList.isEmpty(); + final long timeChunkSize = + Objects.isNull(pendingAlignedChunkGroup) + ? PipeMemoryWeightUtil.calculateChunkRamBytesUsed( + timeChunkList.get(valueChunk.timeChunkIndex)) + : 0; + final long chunkSizeAfterCaching = + pendingAlignedChunkSize + timeChunkSize + valueChunk.valueChunkSize; + + if (isFirstValueChunkInGroup) { + final long firstValueChunkGroupSize = + timeChunkSize + + (Objects.isNull(pendingAlignedChunkGroup) ? 0 : pendingAlignedChunkGroup.chunkSize) + + valueChunk.valueChunkSize; + if (firstValueChunkGroupSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + return !pendingAlignedChunkGroups.isEmpty(); + } + } + + if (!pendingAlignedChunkGroups.isEmpty() + && chunkSizeAfterCaching > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + return true; + } + + final long timeChunkPageMemorySize = timeChunkPageMemorySizeList.get(valueChunk.timeChunkIndex); + if (timeChunkPageMemorySize <= 0 || valueChunk.valueChunkPageMemorySize <= 0) { + return false; + } + + final long pageMemorySizeAfterCaching = + timeChunkPageMemorySize + + (Objects.isNull(pendingAlignedChunkGroup) + ? 0 + : pendingAlignedChunkGroup.valueChunkPageMemorySize) + + valueChunk.valueChunkPageMemorySize; + if (pageMemorySizeAfterCaching <= getPageDataMemoryLimitInBytes()) { + return false; + } + + if (isFirstValueChunkInGroup) { + final long firstValueChunkPageMemorySize = + timeChunkPageMemorySize + valueChunk.valueChunkPageMemorySize; + return firstValueChunkPageMemorySize <= getPageDataMemoryLimitInBytes() + || !pendingAlignedChunkGroups.isEmpty(); + } + + return true; + } + + private boolean returnPendingAlignedChunkBeforeCaching(final CachedAlignedValueChunk valueChunk) + throws IOException { + if (!shouldReturnPendingAlignedChunkBeforeCaching(valueChunk)) { + return false; + } + + cachedAlignedValueChunk = valueChunk; + if (recordPendingAlignedChunk(Byte.MIN_VALUE)) { + return true; + } + cachedAlignedValueChunk = null; + return false; + } + + private void cacheAlignedValueChunk(final CachedAlignedValueChunk valueChunk) throws IOException { + validateAlignedValueChunkTimeIndex(valueChunk.timeChunkIndex); + + final PendingAlignedChunkGroup pendingAlignedChunkGroup = + getOrCreatePendingAlignedChunkGroup(valueChunk.timeChunkIndex); + resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup, valueChunk); + resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit(pendingAlignedChunkGroup, valueChunk); + + pendingAlignedChunkGroup.valueChunkList.add(valueChunk.chunk); + pendingAlignedChunkGroup.chunkSize += valueChunk.valueChunkSize; + pendingAlignedChunkGroup.valueChunkPageMemorySize += valueChunk.valueChunkPageMemorySize; + pendingAlignedChunkSize += valueChunk.valueChunkSize; + + final ChunkHeader chunkHeader = valueChunk.chunk.getHeader(); + final String measurementID = tabletStringInternPool.intern(chunkHeader.getMeasurementID()); + pendingAlignedChunkGroup.measurements.add( + new MeasurementSchema( + measurementID, + chunkHeader.getDataType(), + chunkHeader.getEncodingType(), + chunkHeader.getCompressionType())); + pendingAlignedChunkGroup.modsInfos.addAll( + ModsOperationUtil.initializeMeasurementMods( + currentDevice, Collections.singletonList(measurementID), currentModifications)); + } + + private PendingAlignedChunkGroup getOrCreatePendingAlignedChunkGroup(final int timeChunkIndex) { + final PendingAlignedChunkGroup pendingAlignedChunkGroup = + findPendingAlignedChunkGroup(timeChunkIndex); + if (Objects.nonNull(pendingAlignedChunkGroup)) { + return pendingAlignedChunkGroup; + } + + final PendingAlignedChunkGroup newPendingAlignedChunkGroup = + new PendingAlignedChunkGroup( + timeChunkIndex, + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(timeChunkIndex)), + timeChunkPageMemorySizeList.get(timeChunkIndex)); + pendingAlignedChunkSize += newPendingAlignedChunkGroup.chunkSize; + + for (int i = 0; i < pendingAlignedChunkGroups.size(); ++i) { + if (pendingAlignedChunkGroups.get(i).timeChunkIndex > timeChunkIndex) { + pendingAlignedChunkGroups.add(i, newPendingAlignedChunkGroup); + return newPendingAlignedChunkGroup; + } + } + pendingAlignedChunkGroups.add(newPendingAlignedChunkGroup); + return newPendingAlignedChunkGroup; + } + + private PendingAlignedChunkGroup findPendingAlignedChunkGroup(final int timeChunkIndex) { + for (final PendingAlignedChunkGroup pendingAlignedChunkGroup : pendingAlignedChunkGroups) { + if (pendingAlignedChunkGroup.timeChunkIndex == timeChunkIndex) { + return pendingAlignedChunkGroup; + } + } + return null; + } + + private void validateAlignedValueChunkTimeIndex(final int timeChunkIndex) throws IOException { + if (timeChunkIndex < 0 || timeChunkIndex >= timeChunkList.size()) { + throw new IOException( + String.format( + "Invalid aligned value chunk index %d, while there are %d time chunks.", + timeChunkIndex, timeChunkList.size())); + } + } + + private void clearCachedAlignedChunkData() { + pendingAlignedChunkGroups.clear(); + pendingAlignedChunkSize = 0; + cachedAlignedValueChunk = null; + timeChunkList.clear(); + isMultiPageList.clear(); + timeChunkPageMemorySizeList.clear(); + measurementIndexMap.clear(); + } + private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit( - final List valueChunkList, final ChunkHeader valueChunkHeader) { - if (!valueChunkList.isEmpty() || lastIndex < 0) { + final PendingAlignedChunkGroup pendingAlignedChunkGroup, + final CachedAlignedValueChunk valueChunk) { + if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) { return; } - final long chunkSize = - PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(lastIndex)) - + valueChunkHeader.getDataSize(); + final long chunkSize = pendingAlignedChunkGroup.chunkSize + valueChunk.valueChunkSize; if (chunkSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, chunkSize); } } private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( - final List valueChunkList, final long valueChunkPageMemorySize) { - if (!valueChunkList.isEmpty() || lastIndex < 0 || valueChunkPageMemorySize <= 0) { + final PendingAlignedChunkGroup pendingAlignedChunkGroup, + final CachedAlignedValueChunk valueChunk) { + if (!pendingAlignedChunkGroup.valueChunkList.isEmpty() + || pendingAlignedChunkGroup.timeChunkPageMemorySize <= 0 + || valueChunk.valueChunkPageMemorySize <= 0) { return; } - final long timeChunkPageMemorySize = timeChunkPageMemorySizeList.get(lastIndex); - if (timeChunkPageMemorySize <= 0) { - return; - } - - final long pageMemorySize = timeChunkPageMemorySize + valueChunkPageMemorySize; + final long pageMemorySize = + pendingAlignedChunkGroup.timeChunkPageMemorySize + valueChunk.valueChunkPageMemorySize; if (pageMemorySize > getPageDataMemoryLimitInBytes()) { PipeDataNodeResourceManager.memory() .forceResize(allocatedMemoryBlockForBatchData, pageMemorySize); @@ -980,4 +1098,41 @@ private Statistics findNonAlignedChunkStatistics( } return null; } + + private static class PendingAlignedChunkGroup { + + private final int timeChunkIndex; + private final List valueChunkList = new ArrayList<>(); + private final List measurements = new ArrayList<>(); + private final List modsInfos = new ArrayList<>(); + private final long timeChunkPageMemorySize; + private long chunkSize; + private long valueChunkPageMemorySize; + + private PendingAlignedChunkGroup( + final int timeChunkIndex, final long timeChunkSize, final long timeChunkPageMemorySize) { + this.timeChunkIndex = timeChunkIndex; + this.chunkSize = timeChunkSize; + this.timeChunkPageMemorySize = timeChunkPageMemorySize; + } + } + + private static class CachedAlignedValueChunk { + + private final int timeChunkIndex; + private final Chunk chunk; + private final long valueChunkSize; + private final long valueChunkPageMemorySize; + + private CachedAlignedValueChunk( + final int timeChunkIndex, + final Chunk chunk, + final long valueChunkSize, + final long valueChunkPageMemorySize) { + this.timeChunkIndex = timeChunkIndex; + this.chunk = chunk; + this.valueChunkSize = valueChunkSize; + this.valueChunkPageMemorySize = valueChunkPageMemorySize; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java index 04eff0067e555..33ceacf278cd0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java @@ -191,6 +191,14 @@ private static Pair calculateTabletRowCountAndMemoryBySize( return new Pair<>(1, 0); } + final int configuredTabletRowSize = + PipeConfig.getInstance().getPipeDataStructureTabletRowSize(); + final boolean hasTabletRowSizeLimit = configuredTabletRowSize > 0; + final double inputSizeLimit = + hasTabletRowSizeLimit && inputNum > 0 + ? 100 + inputNum * (double) rowBytesUsed * 1.2 + : Integer.MAX_VALUE; + // Calculate row number according to the max size of a pipe tablet. "100" is the estimated size // of other data structures in a pipe tablet. // "*8" converts bytes to bits, because the bitmap size is 1 bit per schema. @@ -198,17 +206,15 @@ private static Pair calculateTabletRowCountAndMemoryBySize( (int) Math.min( IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), - Math.min(Integer.MAX_VALUE, 100 + inputNum * (double) rowBytesUsed * 1.2)); + Math.min(Integer.MAX_VALUE, inputSizeLimit)); int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount); rowNumber = Math.max(1, rowNumber); - if ( // This means the row number is larger than the max row count of a pipe tablet - rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) { + // This means the row number is larger than the max row count of a pipe tablet. + if (hasTabletRowSizeLimit && rowNumber > configuredTabletRowSize) { // Bound the row number, the memory cost is rowSize * rowNumber - return new Pair<>( - PipeConfig.getInstance().getPipeDataStructureTabletRowSize(), - rowBytesUsed * PipeConfig.getInstance().getPipeDataStructureTabletRowSize()); + return new Pair<>(configuredTabletRowSize, rowBytesUsed * configuredTabletRowSize); } else { return new Pair<>(rowNumber, sizeLimit); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 50109b935c365..626b01c84e094 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -36,6 +36,8 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; +import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; @@ -55,6 +57,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.BatchData; import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.Path; import org.apache.tsfile.read.common.TimeRange; @@ -63,9 +66,11 @@ import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.TsFileGeneratorUtils; import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.writer.TsFileIOWriter; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -436,6 +441,100 @@ public void testScanParserResizesChunkMemoryForFirstAlignedValueChunk() throws E } } + @Test + public void testScanParserMergesBatchedAlignedValueChunkGroups() throws Exception { + final long originalPipeMaxReaderChunkSize = + CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize(); + final int originalPipeDataStructureTabletRowSize = + CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize(); + + final int measurementCount = 20; + final int batchSize = 10; + final int rowCount = 4; + final File sourceTsFile = new File("aligned-source-for-batched-layout.tsfile"); + alignedTsFile = new File("aligned-batched-layout.tsfile"); + + try { + CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(1024 * 1024L); + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0); + + final List schemaList = new ArrayList<>(); + for (int i = 0; i < measurementCount; ++i) { + schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64)); + } + + writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount); + rewriteAlignedTsFileWithBatchedValueChunks( + sourceTsFile, alignedTsFile, measurementCount, batchSize); + + int tabletCount = 0; + int pointCount = 0; + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + alignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + Assert.assertTrue(tabletWithIsAligned.getRight()); + final Tablet tablet = tabletWithIsAligned.getLeft(); + ++tabletCount; + Assert.assertEquals(measurementCount, tablet.getSchemas().size()); + pointCount += getNonNullSize(tablet); + } + } + + Assert.assertEquals(measurementCount * rowCount, pointCount); + Assert.assertTrue(tabletCount > 0); + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize); + sourceTsFile.delete(); + } + } + + @Test + public void testPipeTabletRowSizeCanBeDisabledByNonPositiveValue() { + final int originalPipeDataStructureTabletRowSize = + CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize(); + final int originalPipeDataStructureTabletSizeInBytes = + CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(); + + try { + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(1024 * 1024); + + final BatchData batchData = new BatchData(TSDataType.INT64); + for (int i = 0; i < 1000; ++i) { + batchData.putAnObject(i, (long) i); + } + + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(2); + final int rowCountWithLimit = + PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft(); + + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0); + final int rowCountWithoutLimit = + PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft(); + + Assert.assertEquals(2, rowCountWithLimit); + Assert.assertTrue(rowCountWithoutLimit > rowCountWithLimit); + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize); + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletSizeInBytes(originalPipeDataStructureTabletSizeInBytes); + } + } + @Test public void testQueryParserSkipsUnnecessaryBitMaps() throws Exception { testTreeParserSkipsUnnecessaryBitMaps(true); @@ -1887,6 +1986,102 @@ private long calculatePipeMaxReaderChunkSizeForMultiPageAlignedChunk(final File } } + private void writeAlignedSourceTsFile( + final File tsFile, final List schemaList, final int rowCount) + throws IOException { + if (tsFile.exists()) { + Assert.assertTrue(tsFile.delete()); + } + Assert.assertEquals(0, rowCount % 2); + + final IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.d"); + try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) { + writer.startChunkGroup(deviceID); + final int rowCountPerChunk = rowCount / 2; + for (int chunkIndex = 0; chunkIndex < 2; ++chunkIndex) { + final AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(schemaList); + for (int row = 0; row < rowCountPerChunk; ++row) { + final long time = (long) chunkIndex * rowCountPerChunk + row; + alignedChunkWriter.getTimeChunkWriter().write(time); + for (int measurementIndex = 0; measurementIndex < schemaList.size(); ++measurementIndex) { + alignedChunkWriter + .getValueChunkWriterByIndex(measurementIndex) + .write(time, time * 100 + measurementIndex, false); + } + } + alignedChunkWriter.writeToFileWriter(writer); + } + writer.endChunkGroup(); + writer.endFile(); + } + } + + private void rewriteAlignedTsFileWithBatchedValueChunks( + final File sourceTsFile, + final File targetTsFile, + final int measurementCount, + final int batchSize) + throws Exception { + if (targetTsFile.exists()) { + Assert.assertTrue(targetTsFile.delete()); + } + + try (final TsFileSequenceReader reader = + new TsFileSequenceReader(sourceTsFile.getAbsolutePath())) { + final IDeviceID deviceID = reader.getDeviceMeasurementsMap().keySet().iterator().next(); + final List sourceAlignedChunkMetadataList = + reader.getAlignedChunkMetadata(deviceID, true); + Assert.assertEquals(2, sourceAlignedChunkMetadataList.size()); + for (final AbstractAlignedChunkMetadata sourceAlignedChunkMetadata : + sourceAlignedChunkMetadataList) { + Assert.assertEquals( + measurementCount, sourceAlignedChunkMetadata.getValueChunkMetadataList().size()); + } + + try (final CompactionTsFileWriter writer = + new CompactionTsFileWriter( + targetTsFile, Long.MAX_VALUE, CompactionType.INNER_SEQ_COMPACTION)) { + writer.startChunkGroup(deviceID); + writer.markStartingWritingAligned(); + for (final AbstractAlignedChunkMetadata sourceAlignedChunkMetadata : + sourceAlignedChunkMetadataList) { + final ChunkMetadata timeChunkMetadata = + (ChunkMetadata) sourceAlignedChunkMetadata.getTimeChunkMetadata(); + writer.writeChunk(reader.readMemChunk(timeChunkMetadata), timeChunkMetadata); + } + + for (int start = 0; start < measurementCount; start += batchSize) { + writeValueChunkBatch( + reader, + writer, + sourceAlignedChunkMetadataList, + start, + Math.min(start + batchSize, measurementCount)); + } + writer.markEndingWritingAligned(); + writer.endChunkGroup(); + writer.endFile(); + } + } + } + + private void writeValueChunkBatch( + final TsFileSequenceReader reader, + final CompactionTsFileWriter writer, + final List alignedChunkMetadataList, + final int start, + final int end) + throws IOException { + for (final AbstractAlignedChunkMetadata alignedChunkMetadata : alignedChunkMetadataList) { + final List valueChunkMetadataList = + alignedChunkMetadata.getValueChunkMetadataList(); + for (int index = start; index < end; ++index) { + final ChunkMetadata valueChunkMetadata = (ChunkMetadata) valueChunkMetadataList.get(index); + writer.writeChunk(reader.readMemChunk(valueChunkMetadata), valueChunkMetadata); + } + } + } + private static class ParserPerformanceStats { private long pointCount; From f2bd2eb7239fee17f4ad6fe6e51d00680a841f14 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 23 Jun 2026 17:56:03 +0800 Subject: [PATCH 2/4] Test pipe batched aligned chunk memory boundaries --- .../event/TsFileInsertionEventParserTest.java | 118 +++++++++++++++++- 1 file changed, 117 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 626b01c84e094..36d0fdd31ddbd 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -483,12 +483,79 @@ public void testScanParserMergesBatchedAlignedValueChunkGroups() throws Exceptio final Tablet tablet = tabletWithIsAligned.getLeft(); ++tabletCount; Assert.assertEquals(measurementCount, tablet.getSchemas().size()); + Assert.assertEquals(rowCount / 2, tablet.getRowSize()); pointCount += getNonNullSize(tablet); } } Assert.assertEquals(measurementCount * rowCount, pointCount); - Assert.assertTrue(tabletCount > 0); + Assert.assertEquals(2, tabletCount); + } finally { + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize); + CommonDescriptor.getInstance() + .getConfig() + .setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize); + sourceTsFile.delete(); + } + } + + @Test + public void testScanParserFlushesBatchedAlignedValueChunkGroupsByMemoryLimit() throws Exception { + final long originalPipeMaxReaderChunkSize = + CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize(); + final int originalPipeDataStructureTabletRowSize = + CommonDescriptor.getInstance().getConfig().getPipeDataStructureTabletRowSize(); + + final int measurementCount = 20; + final int batchSize = 10; + final int rowCount = 4; + final File sourceTsFile = new File("aligned-source-for-batched-layout-memory-limit.tsfile"); + alignedTsFile = new File("aligned-batched-layout-memory-limit.tsfile"); + + try { + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(0); + + final List schemaList = new ArrayList<>(); + for (int i = 0; i < measurementCount; ++i) { + schemaList.add(new MeasurementSchema("s" + i, TSDataType.INT64)); + } + + writeAlignedSourceTsFile(sourceTsFile, schemaList, rowCount); + rewriteAlignedTsFileWithBatchedValueChunks( + sourceTsFile, alignedTsFile, measurementCount, batchSize); + CommonDescriptor.getInstance() + .getConfig() + .setPipeMaxReaderChunkSize( + calculateFirstBatchedAlignedValueChunkGroupMemoryLimit(alignedTsFile, batchSize)); + + int tabletCount = 0; + int maxMeasurementCount = 0; + int pointCount = 0; + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + alignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { + Assert.assertTrue(tabletWithIsAligned.getRight()); + final Tablet tablet = tabletWithIsAligned.getLeft(); + ++tabletCount; + maxMeasurementCount = Math.max(maxMeasurementCount, tablet.getSchemas().size()); + Assert.assertTrue(tablet.getSchemas().size() <= batchSize); + Assert.assertEquals(rowCount / 2, tablet.getRowSize()); + pointCount += getNonNullSize(tablet); + } + } + + Assert.assertEquals(batchSize, maxMeasurementCount); + Assert.assertEquals(measurementCount * rowCount, pointCount); + Assert.assertEquals(measurementCount / batchSize * 2, tabletCount); } finally { CommonDescriptor.getInstance() .getConfig() @@ -523,6 +590,11 @@ public void testPipeTabletRowSizeCanBeDisabledByNonPositiveValue() { final int rowCountWithoutLimit = PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft(); + CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(-1); + final int rowCountWithNegativeLimit = + PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(batchData).getLeft(); + + Assert.assertEquals(rowCountWithoutLimit, rowCountWithNegativeLimit); Assert.assertEquals(2, rowCountWithLimit); Assert.assertTrue(rowCountWithoutLimit > rowCountWithLimit); } finally { @@ -1986,6 +2058,50 @@ private long calculatePipeMaxReaderChunkSizeForMultiPageAlignedChunk(final File } } + private long calculateFirstBatchedAlignedValueChunkGroupMemoryLimit( + final File tsFile, final int batchSize) throws Exception { + try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { + final IDeviceID deviceID = reader.getDeviceMeasurementsMap().keySet().iterator().next(); + final List alignedChunkMetadataList = + reader.getAlignedChunkMetadata(deviceID, true); + Assert.assertEquals(2, alignedChunkMetadataList.size()); + + final AbstractAlignedChunkMetadata alignedChunkMetadata = alignedChunkMetadataList.get(0); + final Chunk timeChunk = + reader.readMemChunk((ChunkMetadata) alignedChunkMetadata.getTimeChunkMetadata()); + final List valueChunkMetadataList = + alignedChunkMetadata.getValueChunkMetadataList(); + Assert.assertTrue(valueChunkMetadataList.size() >= batchSize * 2); + + final List firstValueChunkBatch = new ArrayList<>(); + final List firstTwoValueChunkBatches = new ArrayList<>(); + long firstBatchChunkSize = PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunk); + long firstTwoBatchChunkSize = firstBatchChunkSize; + for (int index = 0; index < batchSize * 2; ++index) { + final Chunk valueChunk = + reader.readMemChunk((ChunkMetadata) valueChunkMetadataList.get(index)); + if (index < batchSize) { + firstValueChunkBatch.add(valueChunk); + firstBatchChunkSize += valueChunk.getHeader().getDataSize(); + } + firstTwoValueChunkBatches.add(valueChunk); + firstTwoBatchChunkSize += valueChunk.getHeader().getDataSize(); + } + + final long firstBatchPageMemorySize = + AlignedSinglePageWholeChunkReader + .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData( + timeChunk, firstValueChunkBatch); + final long firstTwoBatchPageMemorySize = + AlignedSinglePageWholeChunkReader + .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData( + timeChunk, firstTwoValueChunkBatches); + Assert.assertTrue(firstTwoBatchChunkSize > firstBatchChunkSize); + Assert.assertTrue(firstTwoBatchPageMemorySize > firstBatchPageMemorySize); + return Math.max(firstBatchChunkSize, firstBatchPageMemorySize); + } + } + private void writeAlignedSourceTsFile( final File tsFile, final List schemaList, final int rowCount) throws IOException { From e3d496bfcfadd8e3d78aaab97d9bca8b236ad220 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 24 Jun 2026 10:56:50 +0800 Subject: [PATCH 3/4] Pipe: fix batched aligned scan parser memory split --- .../scan/SinglePageWholeChunkReader.java | 38 +++++- .../scan/TsFileInsertionEventScanParser.java | 122 ++++++++++-------- 2 files changed, 101 insertions(+), 59 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java index 1be5565b394bc..075e47296ce73 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/SinglePageWholeChunkReader.java @@ -21,6 +21,7 @@ import org.apache.iotdb.db.i18n.DataNodePipeMessages; +import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.encoding.decoder.Decoder; import org.apache.tsfile.encrypt.EncryptParameter; @@ -169,22 +170,49 @@ static List toSuffixMaxList(final List pageEstimatedMemoryUsageInByt static long estimatePageMemoryUsageInBytesWithBatchData( final PageHeader timePageHeader, final Chunk timeChunk, - final List valueDataTypeList) { + final List valueDataTypeList) + throws IOException { return estimatePageMemoryUsageInBytesWithBatchData( timePageHeader.getUncompressedSize(), getPageRowCount(timePageHeader, timeChunk), valueDataTypeList); } - static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) { + static int getPageRowCount(final PageHeader pageHeader, final Chunk chunk) throws IOException { if (isSinglePageChunk(chunk.getHeader())) { - return Objects.isNull(chunk.getChunkStatistic()) - ? 0 - : saturateToInt(chunk.getChunkStatistic().getCount()); + if (Objects.nonNull(chunk.getChunkStatistic())) { + return saturateToInt(chunk.getChunkStatistic().getCount()); + } + return isTimeChunk(chunk.getHeader()) ? countSinglePageTimeValues(chunk) : 0; } return saturateToInt(pageHeader.getNumOfValues()); } + private static int countSinglePageTimeValues(final Chunk chunk) throws IOException { + final ByteBuffer chunkDataBuffer = chunk.getData().duplicate(); + final PageHeader pageHeader = deserializePageHeader(chunkDataBuffer, chunk.getHeader()); + final ByteBuffer pageData = + deserializePageData( + pageHeader, + chunkDataBuffer, + chunk.getHeader(), + IDecryptor.getDecryptor(chunk.getEncryptParam())); + final Decoder decoder = + Decoder.getDecoderByType(chunk.getHeader().getEncodingType(), TSDataType.INT64); + + int rowCount = 0; + while (decoder.hasNext(pageData)) { + decoder.readLong(pageData); + ++rowCount; + } + return rowCount; + } + + private static boolean isTimeChunk(final ChunkHeader chunkHeader) { + return (chunkHeader.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) + == TsFileConstant.TIME_COLUMN_MASK; + } + private static int saturateToInt(final long value) { return (int) Math.min(Integer.MAX_VALUE, value); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 1251127379e4a..9b0ef4bdf0dc0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -72,6 +72,7 @@ import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -103,7 +104,6 @@ public class TsFileInsertionEventScanParser extends TsFileInsertionEventParser { // Cached time chunk private final List timeChunkList = new ArrayList<>(); private final List isMultiPageList = new ArrayList<>(); - private final List timeChunkPageMemorySizeList = new ArrayList<>(); private final Map measurementIndexMap = new HashMap<>(); private final List pendingAlignedChunkGroups = new ArrayList<>(); @@ -684,11 +684,7 @@ private void moveToNextChunkReader() new Chunk( chunkHeader, tsFileSequenceReader.readChunk(-1, chunkHeader.getDataSize())); valueChunk = - new CachedAlignedValueChunk( - valueIndex, - chunk, - chunkHeader.getDataSize(), - calculateMaxPageMemorySize(chunk)); + new CachedAlignedValueChunk(valueIndex, chunk, chunkHeader.getDataSize()); } else { cachedAlignedValueChunk = null; } @@ -735,6 +731,10 @@ private long getPageDataMemoryLimitInBytes() { return PipeConfig.getInstance().getPipeMaxReaderChunkSize(); } + private long getChunkMemoryLimitInBytes() { + return PipeConfig.getInstance().getPipeMaxReaderChunkSize(); + } + private boolean filterChunk( final long currentChunkHeaderOffset, final ChunkHeader chunkHeader, @@ -756,8 +756,6 @@ private boolean filterChunk( timeChunkList.add(timeChunk); final boolean isMultiPage = marker == MetaMarker.TIME_CHUNK_HEADER; isMultiPageList.add(isMultiPage); - timeChunkPageMemorySizeList.add( - SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(timeChunk)); return true; } } @@ -887,39 +885,21 @@ private boolean shouldReturnPendingAlignedChunkBeforeCaching( timeChunkSize + (Objects.isNull(pendingAlignedChunkGroup) ? 0 : pendingAlignedChunkGroup.chunkSize) + valueChunk.valueChunkSize; - if (firstValueChunkGroupSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + if (firstValueChunkGroupSize > getChunkMemoryLimitInBytes()) { return !pendingAlignedChunkGroups.isEmpty(); } } if (!pendingAlignedChunkGroups.isEmpty() - && chunkSizeAfterCaching > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + && chunkSizeAfterCaching > getChunkMemoryLimitInBytes()) { return true; } - final long timeChunkPageMemorySize = timeChunkPageMemorySizeList.get(valueChunk.timeChunkIndex); - if (timeChunkPageMemorySize <= 0 || valueChunk.valueChunkPageMemorySize <= 0) { - return false; - } - final long pageMemorySizeAfterCaching = - timeChunkPageMemorySize - + (Objects.isNull(pendingAlignedChunkGroup) - ? 0 - : pendingAlignedChunkGroup.valueChunkPageMemorySize) - + valueChunk.valueChunkPageMemorySize; - if (pageMemorySizeAfterCaching <= getPageDataMemoryLimitInBytes()) { - return false; - } - - if (isFirstValueChunkInGroup) { - final long firstValueChunkPageMemorySize = - timeChunkPageMemorySize + valueChunk.valueChunkPageMemorySize; - return firstValueChunkPageMemorySize <= getPageDataMemoryLimitInBytes() - || !pendingAlignedChunkGroups.isEmpty(); - } - - return true; + calculateMaxAlignedPageMemorySizeWithBatchData( + valueChunk.timeChunkIndex, pendingAlignedChunkGroup, valueChunk); + return pageMemorySizeAfterCaching > getPageDataMemoryLimitInBytes() + && (!isFirstValueChunkInGroup || !pendingAlignedChunkGroups.isEmpty()); } private boolean returnPendingAlignedChunkBeforeCaching(final CachedAlignedValueChunk valueChunk) @@ -946,7 +926,6 @@ private void cacheAlignedValueChunk(final CachedAlignedValueChunk valueChunk) th pendingAlignedChunkGroup.valueChunkList.add(valueChunk.chunk); pendingAlignedChunkGroup.chunkSize += valueChunk.valueChunkSize; - pendingAlignedChunkGroup.valueChunkPageMemorySize += valueChunk.valueChunkPageMemorySize; pendingAlignedChunkSize += valueChunk.valueChunkSize; final ChunkHeader chunkHeader = valueChunk.chunk.getHeader(); @@ -972,8 +951,7 @@ private PendingAlignedChunkGroup getOrCreatePendingAlignedChunkGroup(final int t final PendingAlignedChunkGroup newPendingAlignedChunkGroup = new PendingAlignedChunkGroup( timeChunkIndex, - PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(timeChunkIndex)), - timeChunkPageMemorySizeList.get(timeChunkIndex)); + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(timeChunkList.get(timeChunkIndex))); pendingAlignedChunkSize += newPendingAlignedChunkGroup.chunkSize; for (int i = 0; i < pendingAlignedChunkGroups.size(); ++i) { @@ -1010,7 +988,6 @@ private void clearCachedAlignedChunkData() { cachedAlignedValueChunk = null; timeChunkList.clear(); isMultiPageList.clear(); - timeChunkPageMemorySizeList.clear(); measurementIndexMap.clear(); } @@ -1029,23 +1006,69 @@ private void resizeChunkMemoryBlockIfFirstValueChunkExceedsLimit( private void resizePageDataMemoryBlockIfFirstValueChunkExceedsLimit( final PendingAlignedChunkGroup pendingAlignedChunkGroup, - final CachedAlignedValueChunk valueChunk) { - if (!pendingAlignedChunkGroup.valueChunkList.isEmpty() - || pendingAlignedChunkGroup.timeChunkPageMemorySize <= 0 - || valueChunk.valueChunkPageMemorySize <= 0) { + final CachedAlignedValueChunk valueChunk) + throws IOException { + if (!pendingAlignedChunkGroup.valueChunkList.isEmpty()) { return; } final long pageMemorySize = - pendingAlignedChunkGroup.timeChunkPageMemorySize + valueChunk.valueChunkPageMemorySize; + calculateMaxAlignedPageMemorySizeWithBatchData( + pendingAlignedChunkGroup.timeChunkIndex, pendingAlignedChunkGroup, valueChunk); if (pageMemorySize > getPageDataMemoryLimitInBytes()) { PipeDataNodeResourceManager.memory() .forceResize(allocatedMemoryBlockForBatchData, pageMemorySize); } } - private long calculateMaxPageMemorySize(final Chunk chunk) throws IOException { - return SinglePageWholeChunkReader.calculateMaxPageEstimatedMemoryUsageInBytes(chunk); + private long calculateMaxAlignedPageMemorySizeWithBatchData( + final int timeChunkIndex, + final PendingAlignedChunkGroup pendingAlignedChunkGroup, + final CachedAlignedValueChunk valueChunk) + throws IOException { + final List valueChunkList = + new ArrayList<>( + (Objects.isNull(pendingAlignedChunkGroup) + ? 0 + : pendingAlignedChunkGroup.valueChunkList.size()) + + 1); + if (Objects.nonNull(pendingAlignedChunkGroup)) { + valueChunkList.addAll(pendingAlignedChunkGroup.valueChunkList); + } + valueChunkList.add(valueChunk.chunk); + + final Chunk timeChunk = timeChunkList.get(timeChunkIndex); + final int timeChunkDataPosition = timeChunk.getData().position(); + final List valueChunkDataPositions = new ArrayList<>(valueChunkList.size()); + for (final Chunk chunk : valueChunkList) { + valueChunkDataPositions.add(Objects.isNull(chunk) ? 0 : chunk.getData().position()); + } + + rewindChunkData(timeChunk); + valueChunkList.forEach(this::rewindChunkData); + try { + return AlignedSinglePageWholeChunkReader + .calculateMaxPageEstimatedMemoryUsageInBytesWithBatchData(timeChunk, valueChunkList); + } finally { + timeChunk.getData().position(timeChunkDataPosition); + for (int i = 0; i < valueChunkList.size(); ++i) { + final Chunk chunk = valueChunkList.get(i); + if (Objects.nonNull(chunk)) { + chunk.getData().position(valueChunkDataPositions.get(i)); + } + } + } + } + + private void rewindChunkData(final Chunk chunk) { + if (Objects.isNull(chunk)) { + return; + } + + final ByteBuffer chunkData = chunk.getData(); + if (Objects.nonNull(chunkData)) { + chunkData.rewind(); + } } private boolean isSinglePageValueChunk(final ChunkHeader chunkHeader) { @@ -1105,15 +1128,11 @@ private static class PendingAlignedChunkGroup { private final List valueChunkList = new ArrayList<>(); private final List measurements = new ArrayList<>(); private final List modsInfos = new ArrayList<>(); - private final long timeChunkPageMemorySize; private long chunkSize; - private long valueChunkPageMemorySize; - private PendingAlignedChunkGroup( - final int timeChunkIndex, final long timeChunkSize, final long timeChunkPageMemorySize) { + private PendingAlignedChunkGroup(final int timeChunkIndex, final long timeChunkSize) { this.timeChunkIndex = timeChunkIndex; this.chunkSize = timeChunkSize; - this.timeChunkPageMemorySize = timeChunkPageMemorySize; } } @@ -1122,17 +1141,12 @@ private static class CachedAlignedValueChunk { private final int timeChunkIndex; private final Chunk chunk; private final long valueChunkSize; - private final long valueChunkPageMemorySize; private CachedAlignedValueChunk( - final int timeChunkIndex, - final Chunk chunk, - final long valueChunkSize, - final long valueChunkPageMemorySize) { + final int timeChunkIndex, final Chunk chunk, final long valueChunkSize) { this.timeChunkIndex = timeChunkIndex; this.chunk = chunk; this.valueChunkSize = valueChunkSize; - this.valueChunkPageMemorySize = valueChunkPageMemorySize; } } } From 5a826e34a655356d29db92eb2029890ea0e56877 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 24 Jun 2026 12:11:31 +0800 Subject: [PATCH 4/4] Update TsFileInsertionEventParserTest.java --- .../event/TsFileInsertionEventParserTest.java | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 36d0fdd31ddbd..91b6d7d56c532 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -89,6 +89,7 @@ import java.util.Iterator; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -1511,8 +1512,8 @@ private void testTreeParserSkipsUnnecessaryBitMaps(final boolean isQuery) throws Assert.assertTrue(iterator.hasNext()); Tablet parsedTablet = ((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet(); if (parsedTablet.getSchemas().size() > 1) { - assertBitMapExistence(parsedTablet, false, true); - Assert.assertTrue(parsedTablet.isNull(1, 1)); + assertBitMapExistenceByMeasurement(parsedTablet, Map.of("dense", false, "sparse", true)); + Assert.assertTrue(parsedTablet.isNull(1, getColumnIndex(parsedTablet, "sparse"))); Assert.assertFalse(iterator.hasNext()); } else { Assert.assertNull(parsedTablet.getBitMaps()); @@ -1544,6 +1545,33 @@ private void assertBitMapExistence( } } + private void assertBitMapExistenceByMeasurement( + final Tablet tablet, final Map expectedMeasurementHasBitMap) { + final BitMap[] bitMaps = tablet.getBitMaps(); + Assert.assertNotNull(bitMaps); + Assert.assertEquals(tablet.getSchemas().size(), bitMaps.length); + Assert.assertEquals(expectedMeasurementHasBitMap.size(), tablet.getSchemas().size()); + for (int i = 0; i < tablet.getSchemas().size(); ++i) { + final String measurement = tablet.getSchemas().get(i).getMeasurementName(); + Assert.assertTrue(expectedMeasurementHasBitMap.containsKey(measurement)); + if (expectedMeasurementHasBitMap.get(measurement)) { + Assert.assertNotNull(bitMaps[i]); + } else { + Assert.assertNull(bitMaps[i]); + } + } + } + + private int getColumnIndex(final Tablet tablet, final String measurement) { + for (int i = 0; i < tablet.getSchemas().size(); ++i) { + if (tablet.getSchemas().get(i).getMeasurementName().equals(measurement)) { + return i; + } + } + fail(String.format("Measurement %s does not exist in tablet.", measurement)); + return -1; + } + private void generateLargeAlignedTsFile( final File tsFile, final List schemaList,