Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand All @@ -72,6 +72,7 @@ public class TsFileInsertionQueryDataContainer extends TsFileInsertionDataContai
private final Map<IDeviceID, Boolean> deviceIsAlignedMap;
private final Map<String, TSDataType> measurementDataTypeMap;
private final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool();
private RuntimeException deferredException;

@TestOnly
public TsFileInsertionQueryDataContainer(
Expand Down Expand Up @@ -101,6 +102,7 @@ public TsFileInsertionQueryDataContainer(
pipeTaskMeta,
sourceEvent,
null,
null,
isWithMod);
}

Expand All @@ -116,6 +118,33 @@ public TsFileInsertionQueryDataContainer(
final Map<IDeviceID, Boolean> deviceIsAlignedMap,
final boolean isWithMod)
throws IOException {
this(
pipeName,
creationTime,
tsFile,
pattern,
startTime,
endTime,
pipeTaskMeta,
sourceEvent,
deviceIsAlignedMap,
null,
isWithMod);
}

public TsFileInsertionQueryDataContainer(
final String pipeName,
final long creationTime,
final File tsFile,
final PipePattern pattern,
final long startTime,
final long endTime,
final PipeTaskMeta pipeTaskMeta,
final EnrichedEvent sourceEvent,
final Map<IDeviceID, Boolean> deviceIsAlignedMap,
final Map<IDeviceID, List<String>> deviceMeasurementsMapOverride,
final boolean isWithMod)
throws IOException {
super(
tsFile,
pipeName,
Expand Down Expand Up @@ -145,7 +174,25 @@ public TsFileInsertionQueryDataContainer(
tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, true);
tsFileReader = new TsFileReader(tsFileSequenceReader);

if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
if (Objects.nonNull(deviceMeasurementsMapOverride)) {
this.deviceIsAlignedMap =
Objects.nonNull(deviceIsAlignedMap)
? new LinkedHashMap<>(deviceIsAlignedMap)
: readDeviceIsAlignedMap();
memoryRequiredInBytes +=
Objects.nonNull(deviceIsAlignedMap)
? 0
: PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(this.deviceIsAlignedMap);

measurementDataTypeMap =
readFilteredFullPathDataTypeMap(deviceMeasurementsMapOverride.keySet());
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);

deviceMeasurementsMap = new LinkedHashMap<>(deviceMeasurementsMapOverride);
memoryRequiredInBytes +=
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
} else if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
// These read-only objects can be found in cache.
this.deviceIsAlignedMap =
Objects.nonNull(deviceIsAlignedMap)
Expand Down Expand Up @@ -246,9 +293,31 @@ public TsFileInsertionQueryDataContainer(
}
}

public TsFileInsertionQueryDataContainer(
final File tsFile,
final PipePattern pattern,
final long startTime,
final long endTime,
final Map<IDeviceID, List<String>> deviceMeasurementsMapOverride,
final boolean isWithMod)
throws IOException {
this(
null,
0,
tsFile,
pattern,
startTime,
endTime,
null,
null,
null,
deviceMeasurementsMapOverride,
isWithMod);
}

private Map<IDeviceID, List<String>> filterDeviceMeasurementsMapByPattern(
final Map<IDeviceID, List<String>> originalDeviceMeasurementsMap) {
final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new HashMap<>();
final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new LinkedHashMap<>();
for (final Map.Entry<IDeviceID, List<String>> entry :
originalDeviceMeasurementsMap.entrySet()) {
final String deviceId = ((PlainDeviceID) entry.getKey()).toStringID();
Expand Down Expand Up @@ -282,7 +351,7 @@ else if (pattern.mayOverlapWithDevice(deviceId)) {
}

private Map<IDeviceID, Boolean> readDeviceIsAlignedMap() throws IOException {
final Map<IDeviceID, Boolean> deviceIsAlignedResultMap = new HashMap<>();
final Map<IDeviceID, Boolean> deviceIsAlignedResultMap = new LinkedHashMap<>();
final TsFileDeviceIterator deviceIsAlignedIterator =
tsFileSequenceReader.getAllDevicesIteratorWithIsAligned();
while (deviceIsAlignedIterator.hasNext()) {
Expand Down Expand Up @@ -313,7 +382,7 @@ private Set<IDeviceID> filterDevicesByPattern(final Set<IDeviceID> devices) {
*/
private Map<String, TSDataType> readFilteredFullPathDataTypeMap(final Set<IDeviceID> devices)
throws IOException {
final Map<String, TSDataType> result = new HashMap<>();
final Map<String, TSDataType> result = new LinkedHashMap<>();

for (final IDeviceID device : devices) {
tsFileSequenceReader
Expand All @@ -337,7 +406,7 @@ private Map<String, TSDataType> readFilteredFullPathDataTypeMap(final Set<IDevic
*/
private Map<IDeviceID, List<String>> readFilteredDeviceMeasurementsMap(
final Set<IDeviceID> devices) throws IOException {
final Map<IDeviceID, List<String>> result = new HashMap<>();
final Map<IDeviceID, List<String>> result = new LinkedHashMap<>();

for (final IDeviceID device : devices) {
tsFileSequenceReader
Expand All @@ -364,6 +433,7 @@ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {

@Override
public boolean hasNext() {
throwIfDeferredException();
boolean hasNext = false;
while (tabletIterator == null || !tabletIterator.hasNext()) {
if (!deviceMeasurementsMapIterator.hasNext()) {
Expand Down Expand Up @@ -416,9 +486,16 @@ public TabletInsertionEvent next() {
recordTabletMetrics(tablet);
final boolean isAligned =
deviceIsAlignedMap.getOrDefault(new PlainDeviceID(tablet.deviceId), false);
boolean isLast;
try {
isLast = !hasNext();
} catch (final RuntimeException e) {
deferredException = e;
isLast = false;
}

final TabletInsertionEvent next;
if (!hasNext()) {
if (isLast) {
next =
new PipeRawTabletInsertionEvent(
tablet,
Expand Down Expand Up @@ -448,8 +525,19 @@ public TabletInsertionEvent next() {
return tabletInsertionIterable;
}

private void throwIfDeferredException() {
if (Objects.isNull(deferredException)) {
return;
}

final RuntimeException exception = deferredException;
deferredException = null;
throw exception;
}

@Override
public void close() {
deferredException = null;
try {
if (tsFileReader != null) {
tsFileReader.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.PlainDeviceID;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.TsFileSequenceReader;
Expand Down Expand Up @@ -96,6 +97,7 @@ public class TsFileInsertionScanDataContainer extends TsFileInsertionDataContain
private boolean currentIsAligned;
private final List<MeasurementSchema> currentMeasurements = new ArrayList<>();
private final TabletStringInternPool tabletStringInternPool = new TabletStringInternPool();
private Exception deferredException;

private final List<ModsOperationUtil.ModsInfo> modsInfos = new ArrayList<>();
// Cached time chunk
Expand Down Expand Up @@ -187,6 +189,7 @@ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {

@Override
public boolean hasNext() {
throwIfDeferredException();
final boolean hasNext = Objects.nonNull(chunkReader);
if (hasNext && !parseStartTimeRecorded) {
// Record start time on first hasNext() that returns true
Expand All @@ -205,7 +208,7 @@ public TabletInsertionEvent next() {
throw new NoSuchElementException();
}

// currentIsAligned is initialized when TsFileInsertionEventScanParser is
// currentIsAligned is initialized when TsFileInsertionScanDataContainer is
// constructed.
// When the getNextTablet function is called, currentIsAligned may be updated,
// causing
Expand All @@ -215,7 +218,7 @@ public TabletInsertionEvent next() {
final Tablet tablet = getNextTablet();
// Record tablet metrics
recordTabletMetrics(tablet);
final boolean hasNext = hasNext();
final boolean isLast = isLastTabletWithoutDeferredException();
try {
return new PipeRawTabletInsertionEvent(
tablet,
Expand All @@ -224,9 +227,10 @@ public TabletInsertionEvent next() {
sourceEvent != null ? sourceEvent.getCreationTime() : 0,
pipeTaskMeta,
sourceEvent,
!hasNext);
isLast);
} finally {
if (!hasNext) {
if (isLast) {
recordParseEndTimeIfNecessary();
close();
}
}
Expand All @@ -241,6 +245,7 @@ public Iterable<Pair<Tablet, Boolean>> toTabletWithIsAligneds() {
new Iterator<Pair<Tablet, Boolean>>() {
@Override
public boolean hasNext() {
throwIfDeferredException();
return Objects.nonNull(chunkReader);
}

Expand All @@ -251,24 +256,39 @@ public Pair<Tablet, Boolean> next() {
throw new NoSuchElementException();
}

// currentIsAligned is initialized when TsFileInsertionEventScanParser is constructed.
// currentIsAligned is initialized when TsFileInsertionScanDataContainer is constructed.
// When the getNextTablet function is called, currentIsAligned may be updated, causing
// the currentIsAligned information to be inconsistent with the current Tablet
// information.
final boolean isAligned = currentIsAligned;
final Tablet tablet = getNextTablet();
final boolean hasNext = hasNext();
try {
return new Pair<>(tablet, isAligned);
} finally {
if (!hasNext) {
if (isLastTabletWithoutDeferredException()) {
close();
}
}
}
};
}

public IDeviceID getCurrentDevice() {
return Objects.nonNull(currentDevice) ? new PlainDeviceID(currentDevice) : null;
}

public boolean isCurrentAligned() {
return currentIsAligned;
}

public List<String> getCurrentMeasurements() {
final List<String> measurementIds = new ArrayList<>(currentMeasurements.size());
for (final MeasurementSchema schema : currentMeasurements) {
measurementIds.add(schema.getMeasurementId());
}
return measurementIds;
}

private Tablet getNextTablet() {
try {
Tablet tablet = null;
Expand Down Expand Up @@ -321,7 +341,11 @@ private Tablet getNextTablet() {

// Switch chunk reader iff current chunk is all consumed
if (!data.hasCurrent()) {
prepareData();
try {
prepareData();
} catch (final Exception e) {
deferredException = e;
}
}
PipeTabletUtils.compactBitMaps(tablet);
return tablet;
Expand All @@ -331,6 +355,26 @@ private Tablet getNextTablet() {
}
}

private void throwIfDeferredException() {
if (Objects.isNull(deferredException)) {
return;
}

final Exception exception = deferredException;
deferredException = null;
throw new PipeException("Failed to prepare next tablet insertion event.", exception);
}

private boolean isLastTabletWithoutDeferredException() {
return Objects.isNull(deferredException) && Objects.isNull(chunkReader);
}

private void recordParseEndTimeIfNecessary() {
if (parseStartTimeRecorded && !parseEndTimeRecorded) {
recordParseEndTime();
}
}

private void prepareData() throws IOException {
do {
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
package org.apache.iotdb.db.storageengine.load.converter;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
Expand Down Expand Up @@ -90,17 +88,9 @@ public Optional<TSStatus> visitLoadFile(

try {
for (final File file : loadTsFileStatement.getTsFiles()) {
try (final TsFileInsertionScanDataContainer container =
new TsFileInsertionScanDataContainer(
file,
new IoTDBPipePattern(null),
Long.MIN_VALUE,
Long.MAX_VALUE,
null,
null,
true)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned :
container.toTabletWithIsAligneds()) {
try (final LoadTreeTsFileTabletIterator tabletIterator =
new LoadTreeTsFileTabletIterator(file, true)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned : tabletIterator) {
final PipeTransferTabletRawReq tabletRawReq =
PipeTransferTabletRawReq.toTPipeTransferRawReq(
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight());
Expand Down
Loading
Loading