diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java index e1fa58f5a0f68..ad1f458c95d7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java @@ -51,9 +51,9 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -72,6 +72,7 @@ public class TsFileInsertionQueryDataContainer extends TsFileInsertionDataContai private final Map deviceIsAlignedMap; private final Map measurementDataTypeMap; private final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); + private RuntimeException deferredException; @TestOnly public TsFileInsertionQueryDataContainer( @@ -101,6 +102,7 @@ public TsFileInsertionQueryDataContainer( pipeTaskMeta, sourceEvent, null, + null, isWithMod); } @@ -116,6 +118,33 @@ public TsFileInsertionQueryDataContainer( final Map deviceIsAlignedMap, final boolean isWithMod) throws IOException { + this( + pipeName, + creationTime, + tsFile, + pattern, + startTime, + endTime, + pipeTaskMeta, + sourceEvent, + deviceIsAlignedMap, + null, + isWithMod); + } + + public TsFileInsertionQueryDataContainer( + final String pipeName, + final long creationTime, + final File tsFile, + final PipePattern pattern, + final long startTime, + final long endTime, + final PipeTaskMeta pipeTaskMeta, + final EnrichedEvent sourceEvent, + final Map deviceIsAlignedMap, + final Map> deviceMeasurementsMapOverride, + final boolean isWithMod) + throws IOException { super( tsFile, pipeName, @@ -145,7 +174,25 @@ public TsFileInsertionQueryDataContainer( tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, true); tsFileReader = new TsFileReader(tsFileSequenceReader); - if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) { + if (Objects.nonNull(deviceMeasurementsMapOverride)) { + this.deviceIsAlignedMap = + Objects.nonNull(deviceIsAlignedMap) + ? new LinkedHashMap<>(deviceIsAlignedMap) + : readDeviceIsAlignedMap(); + memoryRequiredInBytes += + Objects.nonNull(deviceIsAlignedMap) + ? 0 + : PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(this.deviceIsAlignedMap); + + measurementDataTypeMap = + readFilteredFullPathDataTypeMap(deviceMeasurementsMapOverride.keySet()); + memoryRequiredInBytes += + PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap); + + deviceMeasurementsMap = new LinkedHashMap<>(deviceMeasurementsMapOverride); + memoryRequiredInBytes += + PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap); + } else if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) { // These read-only objects can be found in cache. this.deviceIsAlignedMap = Objects.nonNull(deviceIsAlignedMap) @@ -246,9 +293,31 @@ public TsFileInsertionQueryDataContainer( } } + public TsFileInsertionQueryDataContainer( + final File tsFile, + final PipePattern pattern, + final long startTime, + final long endTime, + final Map> deviceMeasurementsMapOverride, + final boolean isWithMod) + throws IOException { + this( + null, + 0, + tsFile, + pattern, + startTime, + endTime, + null, + null, + null, + deviceMeasurementsMapOverride, + isWithMod); + } + private Map> filterDeviceMeasurementsMapByPattern( final Map> originalDeviceMeasurementsMap) { - final Map> filteredDeviceMeasurementsMap = new HashMap<>(); + final Map> filteredDeviceMeasurementsMap = new LinkedHashMap<>(); for (final Map.Entry> entry : originalDeviceMeasurementsMap.entrySet()) { final String deviceId = ((PlainDeviceID) entry.getKey()).toStringID(); @@ -282,7 +351,7 @@ else if (pattern.mayOverlapWithDevice(deviceId)) { } private Map readDeviceIsAlignedMap() throws IOException { - final Map deviceIsAlignedResultMap = new HashMap<>(); + final Map deviceIsAlignedResultMap = new LinkedHashMap<>(); final TsFileDeviceIterator deviceIsAlignedIterator = tsFileSequenceReader.getAllDevicesIteratorWithIsAligned(); while (deviceIsAlignedIterator.hasNext()) { @@ -313,7 +382,7 @@ private Set filterDevicesByPattern(final Set devices) { */ private Map readFilteredFullPathDataTypeMap(final Set devices) throws IOException { - final Map result = new HashMap<>(); + final Map result = new LinkedHashMap<>(); for (final IDeviceID device : devices) { tsFileSequenceReader @@ -337,7 +406,7 @@ private Map readFilteredFullPathDataTypeMap(final Set> readFilteredDeviceMeasurementsMap( final Set devices) throws IOException { - final Map> result = new HashMap<>(); + final Map> result = new LinkedHashMap<>(); for (final IDeviceID device : devices) { tsFileSequenceReader @@ -364,6 +433,7 @@ public Iterable toTabletInsertionEvents() { @Override public boolean hasNext() { + throwIfDeferredException(); boolean hasNext = false; while (tabletIterator == null || !tabletIterator.hasNext()) { if (!deviceMeasurementsMapIterator.hasNext()) { @@ -416,9 +486,16 @@ public TabletInsertionEvent next() { recordTabletMetrics(tablet); final boolean isAligned = deviceIsAlignedMap.getOrDefault(new PlainDeviceID(tablet.deviceId), false); + boolean isLast; + try { + isLast = !hasNext(); + } catch (final RuntimeException e) { + deferredException = e; + isLast = false; + } final TabletInsertionEvent next; - if (!hasNext()) { + if (isLast) { next = new PipeRawTabletInsertionEvent( tablet, @@ -448,8 +525,19 @@ public TabletInsertionEvent next() { return tabletInsertionIterable; } + private void throwIfDeferredException() { + if (Objects.isNull(deferredException)) { + return; + } + + final RuntimeException exception = deferredException; + deferredException = null; + throw exception; + } + @Override public void close() { + deferredException = null; try { if (tsFileReader != null) { tsFileReader.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java index ac1fdb94db77e..6c3d341f7740a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java @@ -45,6 +45,7 @@ import org.apache.tsfile.file.header.ChunkHeader; import org.apache.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.PlainDeviceID; import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.TsFileSequenceReader; @@ -96,6 +97,7 @@ public class TsFileInsertionScanDataContainer extends TsFileInsertionDataContain private boolean currentIsAligned; private final List currentMeasurements = new ArrayList<>(); private final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool(); + private Exception deferredException; private final List modsInfos = new ArrayList<>(); // Cached time chunk @@ -187,6 +189,7 @@ public Iterable toTabletInsertionEvents() { @Override public boolean hasNext() { + throwIfDeferredException(); final boolean hasNext = Objects.nonNull(chunkReader); if (hasNext && !parseStartTimeRecorded) { // Record start time on first hasNext() that returns true @@ -205,7 +208,7 @@ public TabletInsertionEvent next() { throw new NoSuchElementException(); } - // currentIsAligned is initialized when TsFileInsertionEventScanParser is + // currentIsAligned is initialized when TsFileInsertionScanDataContainer is // constructed. // When the getNextTablet function is called, currentIsAligned may be updated, // causing @@ -215,7 +218,7 @@ public TabletInsertionEvent next() { final Tablet tablet = getNextTablet(); // Record tablet metrics recordTabletMetrics(tablet); - final boolean hasNext = hasNext(); + final boolean isLast = isLastTabletWithoutDeferredException(); try { return new PipeRawTabletInsertionEvent( tablet, @@ -224,9 +227,10 @@ public TabletInsertionEvent next() { sourceEvent != null ? sourceEvent.getCreationTime() : 0, pipeTaskMeta, sourceEvent, - !hasNext); + isLast); } finally { - if (!hasNext) { + if (isLast) { + recordParseEndTimeIfNecessary(); close(); } } @@ -241,6 +245,7 @@ public Iterable> toTabletWithIsAligneds() { new Iterator>() { @Override public boolean hasNext() { + throwIfDeferredException(); return Objects.nonNull(chunkReader); } @@ -251,17 +256,16 @@ public Pair next() { throw new NoSuchElementException(); } - // currentIsAligned is initialized when TsFileInsertionEventScanParser is constructed. + // currentIsAligned is initialized when TsFileInsertionScanDataContainer is constructed. // When the getNextTablet function is called, currentIsAligned may be updated, causing // the currentIsAligned information to be inconsistent with the current Tablet // information. final boolean isAligned = currentIsAligned; final Tablet tablet = getNextTablet(); - final boolean hasNext = hasNext(); try { return new Pair<>(tablet, isAligned); } finally { - if (!hasNext) { + if (isLastTabletWithoutDeferredException()) { close(); } } @@ -269,6 +273,22 @@ public Pair next() { }; } + public IDeviceID getCurrentDevice() { + return Objects.nonNull(currentDevice) ? new PlainDeviceID(currentDevice) : null; + } + + public boolean isCurrentAligned() { + return currentIsAligned; + } + + public List getCurrentMeasurements() { + final List measurementIds = new ArrayList<>(currentMeasurements.size()); + for (final MeasurementSchema schema : currentMeasurements) { + measurementIds.add(schema.getMeasurementId()); + } + return measurementIds; + } + private Tablet getNextTablet() { try { Tablet tablet = null; @@ -321,7 +341,11 @@ private Tablet getNextTablet() { // Switch chunk reader iff current chunk is all consumed if (!data.hasCurrent()) { - prepareData(); + try { + prepareData(); + } catch (final Exception e) { + deferredException = e; + } } PipeTabletUtils.compactBitMaps(tablet); return tablet; @@ -331,6 +355,26 @@ private Tablet getNextTablet() { } } + private void throwIfDeferredException() { + if (Objects.isNull(deferredException)) { + return; + } + + final Exception exception = deferredException; + deferredException = null; + throw new PipeException("Failed to prepare next tablet insertion event.", exception); + } + + private boolean isLastTabletWithoutDeferredException() { + return Objects.isNull(deferredException) && Objects.isNull(chunkReader); + } + + private void recordParseEndTimeIfNecessary() { + if (parseStartTimeRecorded && !parseEndTimeRecorded) { + recordParseEndTime(); + } + } + private void prepareData() throws IOException { do { do { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java index e6f8ffb73f659..a1da70952461b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java @@ -20,9 +20,7 @@ package org.apache.iotdb.db.storageengine.load.converter; import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; @@ -90,17 +88,9 @@ public Optional visitLoadFile( try { for (final File file : loadTsFileStatement.getTsFiles()) { - try (final TsFileInsertionScanDataContainer container = - new TsFileInsertionScanDataContainer( - file, - new IoTDBPipePattern(null), - Long.MIN_VALUE, - Long.MAX_VALUE, - null, - null, - true)) { - for (final Pair tabletWithIsAligned : - container.toTabletWithIsAligneds()) { + try (final LoadTreeTsFileTabletIterator tabletIterator = + new LoadTreeTsFileTabletIterator(file, true)) { + for (final Pair tabletWithIsAligned : tabletIterator) { final PipeTransferTabletRawReq tabletRawReq = PipeTransferTabletRawReq.toTPipeTransferRawReq( tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java new file mode 100644 index 0000000000000..70ea903baf509 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java @@ -0,0 +1,560 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.converter; + +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; +import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern; +import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.container.query.TsFileInsertionQueryDataContainer; +import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; + +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.PlainDeviceID; +import org.apache.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Load uses scan parsing first for throughput. If scan parsing hits corruption, fall back to query + * parsing for the remaining measurements and devices so later data can still be loaded. + */ +class LoadTreeTsFileTabletIterator + implements Iterable>, Iterator>, AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(LoadTreeTsFileTabletIterator.class); + + private static final PipePattern LOAD_TREE_PATTERN = new IoTDBPipePattern(null); + + private final File file; + private final boolean isWithMod; + private final ArrayDeque pendingQueryTasks = new ArrayDeque<>(); + + private TsFileInsertionScanDataContainer scanParser; + private QueryTask activeQueryTask; + private TsFileInsertionQueryDataContainer activeQueryParser; + private Iterator> activeIterator; + private boolean scanInitialized; + private boolean fallbackTriggered; + + private IDeviceID lastEmittedDevice; + private List lastEmittedMeasurements = Collections.emptyList(); + private long lastEmittedTimestamp = Long.MIN_VALUE; + + private IDeviceID lastScanTabletDevice; + private List lastScanTabletMeasurements = Collections.emptyList(); + private final Map> fullyEmittedMeasurementsByDevice = + new LinkedHashMap<>(); + + LoadTreeTsFileTabletIterator(final File file, final boolean isWithMod) { + this.file = file; + this.isWithMod = isWithMod; + } + + @Override + public Iterator> iterator() { + return this; + } + + @Override + public boolean hasNext() { + while (true) { + try { + ensureActiveIterator(); + if (Objects.isNull(activeIterator)) { + close(); + return false; + } + + if (activeIterator.hasNext()) { + return true; + } + + if (!switchToNextIterator()) { + close(); + return false; + } + } catch (final Exception e) { + if (recoverFromIteratorFailure(e)) { + continue; + } + close(); + throw toRuntimeException(e); + } + } + } + + @Override + public Pair next() { + while (true) { + if (!hasNext()) { + close(); + throw new NoSuchElementException(); + } + + try { + final Pair next = activeIterator.next(); + recordProgress(next); + return next; + } catch (final Exception e) { + if (recoverFromIteratorFailure(e)) { + continue; + } + close(); + throw toRuntimeException(e); + } + } + } + + private void ensureActiveIterator() throws Exception { + if (Objects.nonNull(activeIterator)) { + return; + } + + if (!scanInitialized && !fallbackTriggered) { + scanInitialized = true; + try { + scanParser = + new TsFileInsertionScanDataContainer( + file, LOAD_TREE_PATTERN, Long.MIN_VALUE, Long.MAX_VALUE, null, null, isWithMod); + activeIterator = scanParser.toTabletWithIsAligneds().iterator(); + return; + } catch (final Exception e) { + if (!switchFromScanToQuery(e)) { + throw toRuntimeException(e); + } + } + } + + activateNextQueryParser(); + } + + private boolean switchToNextIterator() { + if (Objects.nonNull(activeQueryParser)) { + closeActiveQueryParser(); + return activateNextQueryParser(); + } + + closeScanParser(); + return activateNextQueryParser(); + } + + private boolean recoverFromIteratorFailure(final Exception e) { + if (shouldRethrow(e)) { + return false; + } + + if (Objects.nonNull(activeQueryTask)) { + LOGGER.warn( + "Load: Query fallback failed for device {} measurements {} in TsFile {}. " + + "Split or skip this query task and continue.", + activeQueryTask.device, + activeQueryTask.measurements, + file.getAbsolutePath(), + e); + splitOrSkipActiveQueryTask(); + return true; + } + + return switchFromScanToQuery(e); + } + + private boolean switchFromScanToQuery(final Exception e) { + if (fallbackTriggered) { + return false; + } + + fallbackTriggered = true; + final IDeviceID currentDevice = + Objects.nonNull(scanParser) ? scanParser.getCurrentDevice() : null; + final List currentMeasurements = + Objects.nonNull(scanParser) ? scanParser.getCurrentMeasurements() : Collections.emptyList(); + + markLastScanMeasurementsAsCompletedIfNeeded(currentDevice, currentMeasurements); + + closeScanParser(); + + try { + pendingQueryTasks.addAll(buildQueryTasks(currentDevice, currentMeasurements)); + } catch (final Exception queryInitException) { + LOGGER.warn( + "Load: Failed to initialize query fallback for TsFile {} after scan parser failure.", + file.getAbsolutePath(), + queryInitException); + return false; + } + + LOGGER.warn( + "Load: Scan parser detected a corrupted section in TsFile {} at device {}. " + + "Switch to query parsing for remaining devices.", + file.getAbsolutePath(), + currentDevice, + e); + return true; + } + + private ArrayDeque buildQueryTasks( + final IDeviceID currentDevice, final List currentMeasurements) throws IOException { + final LinkedHashMap> deviceMeasurementsMap = + readDeviceMeasurementsInOrder(); + if (deviceMeasurementsMap.isEmpty()) { + return new ArrayDeque<>(); + } + + final ArrayDeque tasks = new ArrayDeque<>(); + boolean includeCurrentAndFollowingDevices = + Objects.isNull(currentDevice) || !deviceMeasurementsMap.containsKey(currentDevice); + + for (final Map.Entry> entry : deviceMeasurementsMap.entrySet()) { + final IDeviceID device = entry.getKey(); + if (!includeCurrentAndFollowingDevices && device.equals(currentDevice)) { + includeCurrentAndFollowingDevices = true; + } + if (!includeCurrentAndFollowingDevices) { + continue; + } + + if (device.equals(currentDevice)) { + addCurrentDeviceQueryTasks(tasks, device, entry.getValue(), currentMeasurements); + } else { + addQueryTaskIfNecessary(tasks, device, entry.getValue(), Long.MIN_VALUE, Long.MAX_VALUE); + } + } + + return tasks; + } + + private LinkedHashMap> readDeviceMeasurementsInOrder() + throws IOException { + final LinkedHashMap> deviceMeasurementsMap = new LinkedHashMap<>(); + try (final TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath())) { + final Iterator>> metadataIterator = + reader.iterAllTimeseriesMetadata(false, false); + while (metadataIterator.hasNext()) { + final Pair> deviceMetadata = metadataIterator.next(); + deviceMeasurementsMap.put( + deviceMetadata.getLeft(), + deviceMetadata.getRight().stream() + .map(TimeseriesMetadata::getMeasurementId) + .collect(Collectors.toList())); + } + } + return deviceMeasurementsMap; + } + + private void addCurrentDeviceQueryTasks( + final ArrayDeque tasks, + final IDeviceID device, + final List allMeasurements, + final List currentMeasurements) { + final Set completedMeasurements = + fullyEmittedMeasurementsByDevice.getOrDefault(device, Collections.emptySet()); + final Set currentMeasurementSet = new LinkedHashSet<>(currentMeasurements); + + final List currentMeasurementsToResume = new ArrayList<>(); + final List remainingMeasurements = new ArrayList<>(); + for (final String measurement : allMeasurements) { + if (completedMeasurements.contains(measurement)) { + continue; + } + if (currentMeasurementSet.contains(measurement)) { + currentMeasurementsToResume.add(measurement); + } else { + remainingMeasurements.add(measurement); + } + } + + addQueryTaskIfNecessary( + tasks, + device, + currentMeasurementsToResume, + determineTaskResumeStartTime(device, currentMeasurementsToResume, Long.MIN_VALUE), + Long.MAX_VALUE); + addQueryTaskIfNecessary(tasks, device, remainingMeasurements, Long.MIN_VALUE, Long.MAX_VALUE); + } + + private boolean activateNextQueryParser() { + closeActiveQueryParser(); + + while (!pendingQueryTasks.isEmpty()) { + activeQueryTask = pendingQueryTasks.removeFirst(); + try { + activeQueryParser = + new TsFileInsertionQueryDataContainer( + file, + LOAD_TREE_PATTERN, + activeQueryTask.startTime, + activeQueryTask.endTime, + activeQueryTask.toDeviceMeasurementsMap(), + isWithMod); + final Iterator tabletIterator = + activeQueryParser.toTabletInsertionEvents().iterator(); + activeIterator = + new Iterator>() { + @Override + public boolean hasNext() { + return tabletIterator.hasNext(); + } + + @Override + public Pair next() { + final TabletInsertionEvent event = tabletIterator.next(); + if (!(event instanceof PipeRawTabletInsertionEvent)) { + throw new IllegalStateException( + "Expected PipeRawTabletInsertionEvent but got " + event.getClass().getName()); + } + + final PipeRawTabletInsertionEvent rawTabletInsertionEvent = + (PipeRawTabletInsertionEvent) event; + return new Pair<>( + rawTabletInsertionEvent.convertToTablet(), rawTabletInsertionEvent.isAligned()); + } + }; + return true; + } catch (final Exception e) { + LOGGER.warn( + "Load: Failed to initialize query fallback for device {} measurements {} in TsFile {}. " + + "Split or skip this query task and continue.", + activeQueryTask.device, + activeQueryTask.measurements, + file.getAbsolutePath(), + e); + splitOrSkipActiveQueryTask(); + } + } + + activeIterator = null; + return false; + } + + private void recordProgress(final Pair tabletWithIsAligned) { + final Tablet tablet = tabletWithIsAligned.getLeft(); + if (Objects.isNull(tablet) || tablet.rowSize == 0) { + return; + } + + final IDeviceID device = new PlainDeviceID(tablet.deviceId); + final List measurements = extractMeasurementNames(tablet); + + if (Objects.isNull(activeQueryParser)) { + recordScanProgress(device, measurements); + } + + lastEmittedDevice = device; + lastEmittedMeasurements = measurements; + lastEmittedTimestamp = tablet.timestamps[tablet.rowSize - 1]; + } + + private boolean shouldRethrow(final Exception e) { + Throwable current = e; + while (Objects.nonNull(current)) { + if (current instanceof InterruptedException + || current instanceof PipeRuntimeOutOfMemoryCriticalException) { + return true; + } + current = current.getCause(); + } + return false; + } + + private RuntimeException toRuntimeException(final Exception e) { + return e instanceof RuntimeException + ? (RuntimeException) e + : new IllegalStateException("Failed to iterate tablets while loading TsFile.", e); + } + + private void closeScanParser() { + activeIterator = null; + if (Objects.nonNull(scanParser)) { + scanParser.close(); + scanParser = null; + } + } + + private void closeActiveQueryParser() { + activeIterator = null; + activeQueryTask = null; + if (Objects.isNull(activeQueryParser)) { + return; + } + + activeQueryParser.close(); + activeQueryParser = null; + } + + @Override + public void close() { + activeIterator = null; + closeScanParser(); + closeActiveQueryParser(); + pendingQueryTasks.clear(); + } + + private void recordScanProgress(final IDeviceID device, final List measurements) { + if (Objects.nonNull(lastScanTabletDevice) + && (!lastScanTabletDevice.equals(device) + || !measurementsEqual(lastScanTabletMeasurements, measurements))) { + markMeasurementsFullyEmitted(lastScanTabletDevice, lastScanTabletMeasurements); + } + + lastScanTabletDevice = device; + lastScanTabletMeasurements = measurements; + } + + private void markLastScanMeasurementsAsCompletedIfNeeded( + final IDeviceID currentDevice, final List currentMeasurements) { + if (Objects.isNull(lastScanTabletDevice) || lastScanTabletMeasurements.isEmpty()) { + return; + } + + if (!lastScanTabletDevice.equals(currentDevice) + || !currentMeasurements.isEmpty() + && !measurementsEqual(lastScanTabletMeasurements, currentMeasurements)) { + markMeasurementsFullyEmitted(lastScanTabletDevice, lastScanTabletMeasurements); + } + } + + private void markMeasurementsFullyEmitted( + final IDeviceID device, final List measurements) { + if (Objects.isNull(device) || measurements.isEmpty()) { + return; + } + + fullyEmittedMeasurementsByDevice + .computeIfAbsent(device, key -> new LinkedHashSet<>()) + .addAll(measurements); + } + + private long determineTaskResumeStartTime( + final IDeviceID device, final List measurements, final long defaultStartTime) { + if (measurements.isEmpty() + || !device.equals(lastEmittedDevice) + || lastEmittedTimestamp == Long.MIN_VALUE + || !measurementsEqual(measurements, lastEmittedMeasurements)) { + return defaultStartTime; + } + + return lastEmittedTimestamp == Long.MAX_VALUE ? Long.MAX_VALUE : lastEmittedTimestamp + 1; + } + + private void addQueryTaskIfNecessary( + final ArrayDeque tasks, + final IDeviceID device, + final List measurements, + final long startTime, + final long endTime) { + if (measurements.isEmpty() || startTime == Long.MAX_VALUE) { + return; + } + + tasks.addLast(new QueryTask(device, measurements, startTime, endTime)); + } + + private void splitOrSkipActiveQueryTask() { + final QueryTask failedTask = activeQueryTask; + closeActiveQueryParser(); + if (Objects.isNull(failedTask)) { + return; + } + + if (failedTask.measurements.size() <= 1) { + return; + } + + final long resumeStartTime = + determineTaskResumeStartTime( + failedTask.device, failedTask.measurements, failedTask.startTime); + final List splitTasks = failedTask.split(resumeStartTime); + for (int i = splitTasks.size() - 1; i >= 0; --i) { + pendingQueryTasks.addFirst(splitTasks.get(i)); + } + } + + private List extractMeasurementNames(final Tablet tablet) { + return tablet.getSchemas().stream() + .map(MeasurementSchema::getMeasurementId) + .collect(Collectors.toList()); + } + + private boolean measurementsEqual( + final List leftMeasurements, final List rightMeasurements) { + return leftMeasurements.size() == rightMeasurements.size() + && new LinkedHashSet<>(leftMeasurements).equals(new LinkedHashSet<>(rightMeasurements)); + } + + private static class QueryTask { + private final IDeviceID device; + private final List measurements; + private final long startTime; + private final long endTime; + + private QueryTask( + final IDeviceID device, + final List measurements, + final long startTime, + final long endTime) { + this.device = device; + this.measurements = new ArrayList<>(measurements); + this.startTime = startTime; + this.endTime = endTime; + } + + private LinkedHashMap> toDeviceMeasurementsMap() { + final LinkedHashMap> deviceMeasurementsMap = new LinkedHashMap<>(); + deviceMeasurementsMap.put(device, measurements); + return deviceMeasurementsMap; + } + + private List split(final long resumeStartTime) { + final int middle = measurements.size() / 2; + if (middle <= 0) { + return Collections.emptyList(); + } + + final List splitTasks = new ArrayList<>(2); + splitTasks.add( + new QueryTask(device, measurements.subList(0, middle), resumeStartTime, endTime)); + splitTasks.add( + new QueryTask( + device, measurements.subList(middle, measurements.size()), resumeStartTime, endTime)); + return splitTasks; + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java new file mode 100644 index 0000000000000..150c950fce3eb --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.converter; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern; +import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer; +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.AlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.PlainDeviceID; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.common.Path; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.RandomAccessFile; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +public class LoadTreeStatementDataTypeConvertExecutionVisitorTest { + + private static final String DEVICE_0 = "root.sg.d0"; + private static final String DEVICE_1 = "root.sg.d1"; + private static final String DEVICE_2 = "root.sg.d2"; + private static final String ALIGNED_DEVICE = "root.sg.ad0"; + private static final int ROW_COUNT_PER_DEVICE = 2048; + private File tsFile; + private boolean isPipeMemoryManagementEnabled; + private long pipeMaxReaderChunkSize; + + @Before + public void setUp() { + isPipeMemoryManagementEnabled = PipeConfigAccessor.getPipeMemoryManagementEnabled(); + PipeConfigAccessor.setPipeMemoryManagementEnabled(false); + pipeMaxReaderChunkSize = CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize(); + } + + @After + public void tearDown() { + PipeConfigAccessor.setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled); + CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(pipeMaxReaderChunkSize); + if (tsFile != null && tsFile.exists()) { + Assert.assertTrue(tsFile.delete()); + } + } + + @Test + public void testFallbackToQueryForRemainingDevicesWhenScanParserHitsCorruption() + throws Exception { + tsFile = new File("load-tree-query-fallback-corrupted.tsfile"); + writeTsFile(tsFile); + corruptMeasurementChunk(tsFile, DEVICE_1, "s0"); + + Assert.assertTrue("Expected scan parser to fail after corruption.", scanParserFails(tsFile)); + + final Map pointCountByDevice = new HashMap<>(); + final LoadTreeStatementDataTypeConvertExecutionVisitor visitor = + new LoadTreeStatementDataTypeConvertExecutionVisitor( + statement -> { + collectLoadedPoints(statement, pointCountByDevice); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + }); + + final Optional status = + visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), null); + + Assert.assertTrue(status.isPresent()); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.get().getCode()); + final int loadedPointCountBeforeCorruption = pointCountByDevice.getOrDefault(DEVICE_0, 0); + final int loadedPointCountAfterFallback = pointCountByDevice.getOrDefault(DEVICE_2, 0); + Assert.assertTrue(loadedPointCountBeforeCorruption > 0); + Assert.assertEquals(loadedPointCountBeforeCorruption, loadedPointCountAfterFallback); + } + + @Test + public void testFallbackToQueryWhenFirstNonAlignedDeviceIsCorrupted() throws Exception { + tsFile = new File("load-tree-query-fallback-corrupted-first-non-aligned-device.tsfile"); + writeTsFile(tsFile); + corruptMeasurementChunk(tsFile, DEVICE_0, "s0"); + + Assert.assertTrue("Expected scan parser to fail after corruption.", scanParserFails(tsFile)); + + final Map pointCountByTimeseries = new HashMap<>(); + final LoadTreeStatementDataTypeConvertExecutionVisitor visitor = + new LoadTreeStatementDataTypeConvertExecutionVisitor( + statement -> { + collectLoadedPointsByTimeseries(statement, pointCountByTimeseries); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + }); + + final Optional status = + visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), null); + + Assert.assertTrue(status.isPresent()); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.get().getCode()); + + Assert.assertTrue( + pointCountByTimeseries.getOrDefault(DEVICE_0 + ".s0", 0) < ROW_COUNT_PER_DEVICE); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_0, 1); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_1, 0); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_1, 1); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 0); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 1); + } + + @Test + public void testFallbackDoesNotReloadCompletedMeasurementsOfCurrentNonAlignedDevice() + throws Exception { + tsFile = new File("load-tree-query-fallback-corrupted-current-non-aligned-device.tsfile"); + writeTsFile(tsFile); + corruptMeasurementChunk(tsFile, DEVICE_1, "s1"); + + Assert.assertTrue("Expected scan parser to fail after corruption.", scanParserFails(tsFile)); + + final Map pointCountByTimeseries = new HashMap<>(); + final LoadTreeStatementDataTypeConvertExecutionVisitor visitor = + new LoadTreeStatementDataTypeConvertExecutionVisitor( + statement -> { + collectLoadedPointsByTimeseries(statement, pointCountByTimeseries); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + }); + + final Optional status = + visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), null); + + Assert.assertTrue(status.isPresent()); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.get().getCode()); + + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_1, 0); + Assert.assertTrue( + pointCountByTimeseries.getOrDefault(DEVICE_1 + ".s1", 0) < ROW_COUNT_PER_DEVICE); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 0); + assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 1); + } + + @Test + public void testFallbackToQueryForRemainingMeasurementsOfCurrentAlignedDevice() throws Exception { + CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0); + + tsFile = new File("load-tree-query-fallback-corrupted-aligned-current-device.tsfile"); + writeWideAlignedTsFile(tsFile, ALIGNED_DEVICE, 16); + corruptMeasurementChunk(tsFile, ALIGNED_DEVICE, "s8"); + corruptMeasurementChunk(tsFile, ALIGNED_DEVICE, "s12"); + + Assert.assertTrue("Expected scan parser to fail after corruption.", scanParserFails(tsFile)); + + final Map pointCountByTimeseries = new HashMap<>(); + final LoadTreeStatementDataTypeConvertExecutionVisitor visitor = + new LoadTreeStatementDataTypeConvertExecutionVisitor( + statement -> { + collectLoadedPointsByTimeseries(statement, pointCountByTimeseries); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + }); + + final Optional status = + visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()), null); + + Assert.assertTrue(status.isPresent()); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.get().getCode()); + + for (int measurementIndex = 0; measurementIndex < 8; ++measurementIndex) { + assertMeasurementLoadedCompletely(pointCountByTimeseries, ALIGNED_DEVICE, measurementIndex); + } + for (int measurementIndex : Arrays.asList(9, 10, 11, 13, 14, 15)) { + assertMeasurementLoadedCompletely(pointCountByTimeseries, ALIGNED_DEVICE, measurementIndex); + } + + Assert.assertTrue( + pointCountByTimeseries.getOrDefault(ALIGNED_DEVICE + ".s8", 0) < ROW_COUNT_PER_DEVICE); + Assert.assertTrue( + pointCountByTimeseries.getOrDefault(ALIGNED_DEVICE + ".s12", 0) < ROW_COUNT_PER_DEVICE); + } + + private void writeTsFile(final File file) throws Exception { + if (file.exists()) { + Assert.assertTrue(file.delete()); + } + + final List schemaList = + Arrays.asList( + new MeasurementSchema("s0", TSDataType.INT64, TSEncoding.PLAIN), + new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN)); + + try (final TsFileWriter writer = new TsFileWriter(file)) { + writeDevice(writer, schemaList, DEVICE_0, 0); + writeDevice(writer, schemaList, DEVICE_1, 10_000); + writeDevice(writer, schemaList, DEVICE_2, 20_000); + } + } + + private void writeWideAlignedTsFile( + final File file, final String device, final int measurementCount) throws Exception { + if (file.exists()) { + Assert.assertTrue(file.delete()); + } + + final List schemaList = new java.util.ArrayList<>(); + for (int measurementIndex = 0; measurementIndex < measurementCount; ++measurementIndex) { + schemaList.add( + new MeasurementSchema("s" + measurementIndex, TSDataType.INT64, TSEncoding.PLAIN)); + } + + try (final TsFileWriter writer = new TsFileWriter(file)) { + writer.registerAlignedTimeseries(new Path(device), schemaList); + + final Tablet tablet = new Tablet(device, schemaList, ROW_COUNT_PER_DEVICE); + for (int row = 0; row < ROW_COUNT_PER_DEVICE; ++row) { + tablet.addTimestamp(row, row); + for (int measurementIndex = 0; measurementIndex < measurementCount; ++measurementIndex) { + tablet.addValue("s" + measurementIndex, row, (long) measurementIndex * 10_000 + row); + } + } + tablet.rowSize = ROW_COUNT_PER_DEVICE; + writer.writeAligned(tablet); + } + } + + private void writeDevice( + final TsFileWriter writer, + final List schemaList, + final String device, + final long valueBase) + throws Exception { + writer.registerTimeseries(new Path(device), schemaList); + + final Tablet tablet = new Tablet(device, schemaList, ROW_COUNT_PER_DEVICE); + for (int row = 0; row < ROW_COUNT_PER_DEVICE; ++row) { + tablet.addTimestamp(row, row); + tablet.addValue("s0", row, valueBase + row); + tablet.addValue("s1", row, valueBase + ROW_COUNT_PER_DEVICE + row); + } + tablet.rowSize = ROW_COUNT_PER_DEVICE; + writer.write(tablet); + } + + private void corruptMeasurementChunk( + final File file, final String device, final String measurement) throws Exception { + try (final TsFileSequenceReader reader = new TsFileSequenceReader(file.getAbsolutePath())) { + final IDeviceID deviceId = new PlainDeviceID(device); + final List chunkMetadataList = + reader.getIChunkMetadataList(new Path(deviceId, measurement, false)); + Assert.assertFalse(chunkMetadataList.isEmpty()); + + final long chunkHeaderOffset = + getTargetChunkMetadata(chunkMetadataList.get(0), measurement).getOffsetOfChunkHeader(); + try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) { + randomAccessFile.seek(chunkHeaderOffset + 64); + randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8}); + } + } + } + + private IChunkMetadata getTargetChunkMetadata( + final IChunkMetadata chunkMetadata, final String measurement) { + if (!(chunkMetadata instanceof AlignedChunkMetadata)) { + return chunkMetadata; + } + + final IChunkMetadata valueChunkMetadata = + ((AlignedChunkMetadata) chunkMetadata) + .getValueChunkMetadataList().stream() + .filter(Objects::nonNull) + .filter(metadata -> measurement.equals(metadata.getMeasurementUid())) + .findFirst() + .orElse(null); + Assert.assertNotNull(valueChunkMetadata); + return valueChunkMetadata; + } + + private void assertMeasurementLoadedCompletely( + final Map pointCountByTimeseries, + final String device, + final int measurementIndex) { + Assert.assertEquals( + ROW_COUNT_PER_DEVICE, + pointCountByTimeseries.getOrDefault(device + ".s" + measurementIndex, 0).intValue()); + } + + private boolean scanParserFails(final File file) throws Exception { + try (final TsFileInsertionScanDataContainer parser = + new TsFileInsertionScanDataContainer( + file, new IoTDBPipePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, true)) { + parser.toTabletWithIsAligneds().forEach(tabletWithIsAligned -> {}); + return false; + } catch (final Exception e) { + return true; + } + } + + private void collectLoadedPointsByTimeseries( + final Statement statement, final Map pointCountByTimeseries) { + Assert.assertTrue(statement instanceof InsertMultiTabletsStatement); + for (final InsertTabletStatement insertTabletStatement : + ((InsertMultiTabletsStatement) statement).getInsertTabletStatementList()) { + for (int row = 0; row < insertTabletStatement.getRowCount(); ++row) { + for (int column = 0; column < insertTabletStatement.getMeasurements().length; ++column) { + final String measurement = insertTabletStatement.getMeasurements()[column]; + if (measurement == null || isNull(insertTabletStatement, row, column)) { + continue; + } + pointCountByTimeseries.merge( + insertTabletStatement.getDevicePath().getFullPath() + "." + measurement, + 1, + Integer::sum); + } + } + } + } + + private void collectLoadedPoints( + final Statement statement, final Map pointCountByDevice) { + Assert.assertTrue(statement instanceof InsertMultiTabletsStatement); + for (final InsertTabletStatement insertTabletStatement : + ((InsertMultiTabletsStatement) statement).getInsertTabletStatementList()) { + pointCountByDevice.merge( + insertTabletStatement.getDevicePath().getFullPath(), + countNonNullPoints(insertTabletStatement), + Integer::sum); + } + } + + private int countNonNullPoints(final InsertTabletStatement insertTabletStatement) { + int pointCount = 0; + for (int row = 0; row < insertTabletStatement.getRowCount(); ++row) { + for (int column = 0; column < insertTabletStatement.getMeasurements().length; ++column) { + if (insertTabletStatement.getMeasurements()[column] != null + && !isNull(insertTabletStatement, row, column)) { + ++pointCount; + } + } + } + return pointCount; + } + + private boolean isNull( + final InsertTabletStatement insertTabletStatement, final int row, final int column) { + final Object[] columns = insertTabletStatement.getColumns(); + if (columns == null || column >= columns.length || columns[column] == null) { + return true; + } + + final BitMap[] bitMaps = insertTabletStatement.getBitMaps(); + return bitMaps != null + && column < bitMaps.length + && bitMaps[column] != null + && bitMaps[column].isMarked(row); + } + + private static class PipeConfigAccessor { + private static boolean getPipeMemoryManagementEnabled() { + return CommonDescriptor.getInstance().getConfig().getPipeMemoryManagementEnabled(); + } + + private static void setPipeMemoryManagementEnabled(final boolean enabled) { + CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(enabled); + } + } +}