Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
Expand Down Expand Up @@ -74,7 +76,9 @@
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DatabaseSchemaTask;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.write.view.AlterLogicalViewNode;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
Expand All @@ -97,6 +101,7 @@
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;

import com.google.common.util.concurrent.ListenableFuture;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -113,11 +118,15 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause;

public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeReceiver.class);
Expand Down Expand Up @@ -152,6 +161,14 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();

private PipeMemoryBlock allocatedMemoryBlock;
private final Set<String> autoCreatedTreeDatabases = ConcurrentHashMap.newKeySet();
private final Set<String> conflictedTreeDatabases = ConcurrentHashMap.newKeySet();

private enum TreeDatabaseCreationResult {
SKIPPED,
CREATED_OR_EXISTED,
CONFLICTED
}

static {
try {
Expand Down Expand Up @@ -876,6 +893,11 @@ private TSStatus executeStatement(final Statement statement) {
return RpcUtils.getStatus(status.getCode(), status.getMessage());
}

if (autoCreateTreeDatabaseIfNecessary(getTreeDatabaseName(statement))
== TreeDatabaseCreationResult.CONFLICTED) {
clearTreeDatabaseName(statement);
}

return Coordinator.getInstance()
.executeForTreeModel(
shouldMarkAsPipeRequest.get() ? new PipeEnrichedStatement(statement) : statement,
Expand All @@ -889,6 +911,91 @@ private TSStatus executeStatement(final Statement statement) {
.status;
}

private TreeDatabaseCreationResult autoCreateTreeDatabaseIfNecessary(final String database) {
if (database == null
|| LoadTsFileStatement.getDatabaseLevelByTreeDatabase(database) == null
|| !IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
return TreeDatabaseCreationResult.SKIPPED;
}
if (autoCreatedTreeDatabases.contains(database)) {
return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
}
if (conflictedTreeDatabases.contains(database)) {
return TreeDatabaseCreationResult.CONFLICTED;
}

try {
final DatabaseSchemaStatement statement =
new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
statement.setDatabasePath(new PartialPath(database));
statement.setEnablePrintExceptionLog(false);

final TSStatus permissionStatus = statement.checkPermissionBeforeProcess(username);
if (permissionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(permissionStatus.getMessage());
}

final DatabaseSchemaTask task = new DatabaseSchemaTask(statement);
final ListenableFuture<ConfigTaskResult> future =
task.execute(ClusterConfigTaskExecutor.getInstance());
final ConfigTaskResult result = future.get();
final int statusCode = result.getStatusCode().getStatusCode();
if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()
|| statusCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
autoCreatedTreeDatabases.add(database);
return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
}
if (statusCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
conflictedTreeDatabases.add(database);
return TreeDatabaseCreationResult.CONFLICTED;
}
throw new PipeException(
String.format(
"Auto create tree database failed: %s, status: %s",
database, result.getStatus() == null ? result.getStatusCode() : result.getStatus()));
} catch (final IllegalPathException e) {
throw new PipeException(String.format("Illegal tree database %s.", database), e);
} catch (final ExecutionException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
final Throwable rootCause = getRootCause(e);
final int errorCode;
if (rootCause instanceof IoTDBException) {
errorCode = ((IoTDBException) rootCause).getErrorCode();
} else if (rootCause instanceof IoTDBRuntimeException) {
errorCode = ((IoTDBRuntimeException) rootCause).getErrorCode();
} else {
errorCode = TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode();
}
if (errorCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
autoCreatedTreeDatabases.add(database);
return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
}
if (errorCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
conflictedTreeDatabases.add(database);
return TreeDatabaseCreationResult.CONFLICTED;
}
throw new PipeException("Auto create tree database failed because " + e.getMessage(), e);
}
}

private String getTreeDatabaseName(final Statement statement) {
if (statement instanceof LoadTsFileStatement) {
return ((LoadTsFileStatement) statement).getDatabase();
}
return null;
}

static void clearTreeDatabaseName(final Statement statement) {
if (statement instanceof LoadTsFileStatement) {
final LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement) statement;
loadTsFileStatement.setDatabase(null);
loadTsFileStatement.setDatabaseLevel(
IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel());
}
}

@Override
protected TSStatus login() {
final IClientSession session = SESSION_MANAGER.getCurrSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,19 @@ public IScheduler doSchedule(
&& ((PipeEnrichedStatement) statement).getInnerStatement()
instanceof LoadTsFileStatement;
if (statement instanceof LoadTsFileStatement || isPipeEnrichedTsFileLoad) {
final LoadTsFileStatement loadTsFileStatement =
statement instanceof LoadTsFileStatement
? (LoadTsFileStatement) statement
: (LoadTsFileStatement) ((PipeEnrichedStatement) statement).getInnerStatement();
scheduler =
new LoadTsFileScheduler(
distributedPlan,
context,
stateMachine,
syncInternalServiceClientManager,
partitionFetcher,
isPipeEnrichedTsFileLoad);
isPipeEnrichedTsFileLoad,
loadTsFileStatement.getDatabase());
} else {
scheduler =
new ClusterScheduler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -136,6 +137,7 @@ public class LoadTsFileScheduler implements IScheduler {
private final PlanFragmentId fragmentId;
private final Set<TRegionReplicaSet> allReplicaSets;
private final boolean isGeneratedByPipe;
private final String treeDatabaseForRetry;
private final Map<TTimePartitionSlot, ProgressIndex> timePartitionSlotToProgressIndex;
private final LoadTsFileDataCacheMemoryBlock block;

Expand All @@ -145,7 +147,8 @@ public LoadTsFileScheduler(
QueryStateMachine stateMachine,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> internalServiceClientManager,
IPartitionFetcher partitionFetcher,
boolean isGeneratedByPipe) {
boolean isGeneratedByPipe,
String treeDatabaseForRetry) {
this.queryContext = queryContext;
this.stateMachine = stateMachine;
this.tsFileNodeList = new ArrayList<>();
Expand All @@ -155,6 +158,7 @@ public LoadTsFileScheduler(
this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
this.allReplicaSets = new HashSet<>();
this.isGeneratedByPipe = isGeneratedByPipe;
this.treeDatabaseForRetry = treeDatabaseForRetry;
this.timePartitionSlotToProgressIndex = new HashMap<>();
this.block = LoadTsFileMemoryManager.getInstance().allocateDataCacheMemoryBlock();

Expand Down Expand Up @@ -551,9 +555,7 @@ private void convertFailedTsFilesToTabletsAndRetry() {
final TSStatus status =
loadTsFileDataTypeConverter
.convertForTreeModel(
LoadTsFileStatement.createUnchecked(filePath)
.setDeleteAfterLoad(failedNode.isDeleteAfterLoad())
.setConvertOnTypeMismatch(true))
buildRetryTreeLoadStatement(filePath, failedNode.isDeleteAfterLoad()))
.orElse(null);

if (loadTsFileDataTypeConverter.isSuccessful(status)) {
Expand Down Expand Up @@ -592,6 +594,22 @@ private void convertFailedTsFilesToTabletsAndRetry() {
}
}

private LoadTsFileStatement buildRetryTreeLoadStatement(
final String filePath, final boolean deleteAfterLoad) throws FileNotFoundException {
final LoadTsFileStatement statement =
LoadTsFileStatement.createUnchecked(filePath)
.setDeleteAfterLoad(deleteAfterLoad)
.setConvertOnTypeMismatch(true);
if (treeDatabaseForRetry != null) {
statement.setDatabase(treeDatabaseForRetry);
statement.updateDatabaseLevelByTreeDatabase();
}
if (isGeneratedByPipe) {
statement.markIsGeneratedByPipe();
}
return statement;
}

@Override
public void stop(Throwable t) {
// Do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,23 @@ public void testLoadTsFileSyncStatementCanSkipVerifySchemaWhenNotConvertingType(
Files.deleteIfExists(tsFile);
}
}

@Test
public void testClearTreeDatabaseNameForLoadTsFileStatement() throws Exception {
final Path tsFile = Files.createTempFile("pipe-load-clear-tree-database", ".tsfile");
try {
final LoadTsFileStatement statement =
IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
"root.test.sg_0", tsFile.toString(), true, true);

IoTDBDataNodeReceiver.clearTreeDatabaseName(statement);

Assert.assertNull(statement.getDatabase());
Assert.assertEquals(
IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(),
statement.getDatabaseLevel());
} finally {
Files.deleteIfExists(tsFile);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.io.File;
import java.lang.reflect.Method;
import java.util.Collections;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand All @@ -50,6 +55,7 @@ public void before() {
when(distributedQueryPlan.getRootSubPlan()).thenReturn(subPlan);
when(subPlan.getPlanFragment()).thenReturn(planFragment);
when(planFragment.getId()).thenReturn(new PlanFragmentId("test", 0));
when(distributedQueryPlan.getInstances()).thenReturn(Collections.emptyList());
}

@Test
Expand All @@ -62,9 +68,37 @@ public void tt() {
mock(QueryStateMachine.class),
mock(IClientManager.class),
mock(IPartitionFetcher.class),
false));
false,
null));
t.start();
Assert.assertNull(t.getTotalCpuTime());
Assert.assertNull(t.getFragmentInfo());
}

@Test
public void testBuildRetryTreeLoadStatementUpdatesDatabaseLevel() throws Exception {
final LoadTsFileScheduler scheduler =
new LoadTsFileScheduler(
distributedQueryPlan,
mock(MPPQueryContext.class),
mock(QueryStateMachine.class),
mock(IClientManager.class),
mock(IPartitionFetcher.class),
true,
"root.test.sg_0");
final Method method =
LoadTsFileScheduler.class.getDeclaredMethod(
"buildRetryTreeLoadStatement", String.class, boolean.class);
method.setAccessible(true);

final File tsFile = File.createTempFile("test", ".tsfile");
tsFile.deleteOnExit();

final LoadTsFileStatement statement =
(LoadTsFileStatement) method.invoke(scheduler, tsFile.getAbsolutePath(), true);

Assert.assertEquals("root.test.sg_0", statement.getDatabase());
Assert.assertEquals(2, statement.getDatabaseLevel());
Assert.assertTrue(statement.isGeneratedByPipe());
}
}
Loading